From 3e4389ed992380fb3fba2f89cfa869c19b42f730 Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Mon, 11 Nov 2024 19:20:58 +0100 Subject: [PATCH 1/4] Basic YDB ping service --- ydb/core/driver_lib/run/run.cpp | 4 + ydb/core/grpc_services/rpc_ping.cpp | 98 +++++++++ ydb/core/grpc_services/service_debug.h | 14 ++ ydb/core/grpc_services/ya.make | 1 + .../jaeger_tracing/request_discriminator.h | 3 + ydb/core/kqp/common/events/events.h | 6 + ydb/core/kqp/common/simple/kqp_event_ids.h | 2 + .../kqp/proxy_service/kqp_proxy_service.cpp | 5 + ydb/public/api/grpc/ya.make | 1 + ydb/public/api/grpc/ydb_debug_v1.proto | 12 ++ ydb/public/api/protos/ya.make | 1 + ydb/public/api/protos/ydb_debug.proto | 39 ++++ ydb/public/lib/ydb_cli/commands/ya.make | 5 + ydb/public/lib/ydb_cli/commands/ydb_debug.cpp | 13 ++ ydb/public/lib/ydb_cli/commands/ydb_debug.h | 12 ++ ydb/public/lib/ydb_cli/commands/ydb_ping.cpp | 195 ++++++++++++++++++ ydb/public/lib/ydb_cli/commands/ydb_ping.h | 58 ++++++ .../lib/ydb_cli/commands/ydb_root_common.cpp | 2 + .../sdk/cpp/client/ydb_debug/client.cpp | 94 +++++++++ ydb/public/sdk/cpp/client/ydb_debug/client.h | 61 ++++++ ydb/public/sdk/cpp/client/ydb_debug/ya.make | 12 ++ ydb/public/sdk/cpp/ya.make | 1 + ydb/services/ydb/ya.make | 1 + ydb/services/ydb/ydb_debug.cpp | 76 +++++++ ydb/services/ydb/ydb_debug.h | 35 ++++ 25 files changed, 751 insertions(+) create mode 100644 ydb/core/grpc_services/rpc_ping.cpp create mode 100644 ydb/core/grpc_services/service_debug.h create mode 100644 ydb/public/api/grpc/ydb_debug_v1.proto create mode 100644 ydb/public/api/protos/ydb_debug.proto create mode 100644 ydb/public/lib/ydb_cli/commands/ydb_debug.cpp create mode 100644 ydb/public/lib/ydb_cli/commands/ydb_debug.h create mode 100644 ydb/public/lib/ydb_cli/commands/ydb_ping.cpp create mode 100644 ydb/public/lib/ydb_cli/commands/ydb_ping.h create mode 100644 ydb/public/sdk/cpp/client/ydb_debug/client.cpp create mode 100644 ydb/public/sdk/cpp/client/ydb_debug/client.h create mode 100644 ydb/public/sdk/cpp/client/ydb_debug/ya.make create mode 100644 ydb/services/ydb/ydb_debug.cpp create mode 100644 ydb/services/ydb/ydb_debug.h diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index dafe627e7278..e19f131fa9b4 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -123,6 +123,7 @@ #include #include #include +#include #include #include #include @@ -881,6 +882,9 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); } + server.AddService(new NGRpcService::TGRpcYdbDebugService(ActorSystem.Get(), Counters, + grpcRequestProxies, hasDataStreams.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); + if (hasLogStore) { server.AddService(new NGRpcService::TGRpcYdbLogStoreService(ActorSystem.Get(), Counters, grpcRequestProxies[0], hasLogStore.IsRlAllowed())); diff --git a/ydb/core/grpc_services/rpc_ping.cpp b/ydb/core/grpc_services/rpc_ping.cpp new file mode 100644 index 000000000000..f9e81d8a43df --- /dev/null +++ b/ydb/core/grpc_services/rpc_ping.cpp @@ -0,0 +1,98 @@ +#include "service_debug.h" + +#include +#include + +#include + +#include + +namespace NKikimr::NGRpcService { + +namespace { + +using namespace NActors; + +using TEvKqpProxyRequest = TGrpcRequestNoOperationCall; + +class TExecutePingRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::OTHER; + } + + TExecutePingRPC(TEvKqpProxyRequest* request) + : Request_(request) + {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TExecutePingRPC::StateWork); + + Proceed(ctx); + } + +private: + void StateWork(TAutoPtr& ev) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvProxyPingResponse, Handle); + default: + UnexpectedEvent(__func__, ev); + } + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + + void Proceed(const TActorContext &ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to KQP proxy"); + if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), new NKqp::TEvKqp::TEvProxyPingRequest())) { + LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping"); + ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, ctx); + } + } + + void Handle(NKqp::TEvKqp::TEvProxyPingResponse::TPtr&, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response"); + ReplyWithResult(Ydb::StatusIds::SUCCESS, ctx); + } + +private: + void ReplyWithResult(Ydb::StatusIds::StatusCode status, const TActorContext &ctx) { + Request_->ReplyWithYdbStatus(status); + Die(ctx); + } + + void InternalError(const TString& message) { + ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + } + + void UnexpectedEvent(const TString& state, TAutoPtr& ev) { + InternalError(TStringBuilder() << "TExecutePingRPC in state " << state << " received unexpected event " << + ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + } + + +private: + std::shared_ptr Request_; +}; + +} // anonymous + +void DoGrpcProxyPing(std::unique_ptr p, const IFacilityProvider& f) { + // we are in the GRPC proxy already (or in the check actor in case of auth check), + // thus ready to reply right here + using TRequest = TGrpcRequestNoOperationCall; + TRequest* request = dynamic_cast(p.get()); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); + request->ReplyWithYdbStatus(Ydb::StatusIds::SUCCESS); +} + +void DoKqpPing(std::unique_ptr p, const IFacilityProvider& f) { + auto* request = dynamic_cast(p.release()); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); + f.RegisterActor(new TExecutePingRPC(request)); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/service_debug.h b/ydb/core/grpc_services/service_debug.h new file mode 100644 index 000000000000..486c6798db00 --- /dev/null +++ b/ydb/core/grpc_services/service_debug.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace NKikimr::NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +void DoGrpcProxyPing(std::unique_ptr p, const IFacilityProvider& f); +void DoKqpPing(std::unique_ptr p, const IFacilityProvider& f); + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index f8bdf44127c4..28240413250e 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -63,6 +63,7 @@ SRCS( rpc_make_directory.cpp rpc_modify_permissions.cpp rpc_monitoring.cpp + rpc_ping.cpp rpc_prepare_data_query.cpp rpc_rate_limiter_api.cpp rpc_read_columns.cpp diff --git a/ydb/core/jaeger_tracing/request_discriminator.h b/ydb/core/jaeger_tracing/request_discriminator.h index 17e4a4e74cb6..2c20bb838706 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.h +++ b/ydb/core/jaeger_tracing/request_discriminator.h @@ -70,6 +70,9 @@ enum class ERequestType: size_t { BSCONFIG_REPLACESTORAGECONFIG, BSCONFIG_FETCHSTORAGECONFIG, + PING_GRPC, + PING_PROXY, + PING_KQP, REQUEST_TYPES_CNT, // Add new types above this line }; diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index ead3d7c25ed9..d2ccf9278d5b 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -204,6 +204,12 @@ struct TEvKqp { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; + + struct TEvProxyPingRequest : public TEventLocal { + }; + + struct TEvProxyPingResponse : public TEventLocal { + }; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index d3c6f3f2fd28..a54408006423 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -48,6 +48,8 @@ struct TKqpEvents { EvDelayedRequestError, EvBufferWrite, EvBufferWriteResult, + EvProxyPingRequest, + EvProxyPingResponse, }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index c46e8bed14f7..b919bf96573a 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -644,6 +644,10 @@ class TKqpProxyService : public TActorBootstrapped { Send(ev->Sender, responseEv.Release(), 0, ev->Cookie); } + void Handle(TEvKqp::TEvProxyPingRequest::TPtr& ev) { + Send(ev->Sender, new TEvKqp::TEvProxyPingResponse()); + } + void Handle(TEvKqp::TEvQueryRequest::TPtr& ev) { if (!DatabasesCache.SetDatabaseIdOrDefer(ev, static_cast(EDelayedRequestType::QueryRequest), ActorContext())) { return; @@ -1346,6 +1350,7 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvents::TEvUndelivered, Handle); hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle); hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); + hFunc(TEvKqp::TEvProxyPingRequest, Handle); hFunc(TEvKqp::TEvQueryRequest, Handle); hFunc(TEvKqp::TEvScriptRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); diff --git a/ydb/public/api/grpc/ya.make b/ydb/public/api/grpc/ya.make index 81c725240c01..cfd27e416671 100644 --- a/ydb/public/api/grpc/ya.make +++ b/ydb/public/api/grpc/ya.make @@ -10,6 +10,7 @@ SRCS( ydb_auth_v1.proto ydb_cms_v1.proto ydb_coordination_v1.proto + ydb_debug_v1.proto ydb_discovery_v1.proto ydb_export_v1.proto ydb_import_v1.proto diff --git a/ydb/public/api/grpc/ydb_debug_v1.proto b/ydb/public/api/grpc/ydb_debug_v1.proto new file mode 100644 index 000000000000..26aa568b3a01 --- /dev/null +++ b/ydb/public/api/grpc/ydb_debug_v1.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package Ydb.Debug.V1; +option java_package = "com.yandex.ydb.debug.v1"; + +import "ydb/public/api/protos/ydb_debug.proto"; + +service DebugService { + rpc PingPlainGrpc(Debug.PlainGrpcRequest) returns (Debug.PlainGrpcResponse); + rpc PingGrpcProxy(Debug.GrpcProxyRequest) returns (Debug.GrpcProxyResponse); + rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse); +} diff --git a/ydb/public/api/protos/ya.make b/ydb/public/api/protos/ya.make index 64c5510a3e6a..f6aebfb41e7b 100644 --- a/ydb/public/api/protos/ya.make +++ b/ydb/public/api/protos/ya.make @@ -24,6 +24,7 @@ SRCS( ydb_federation_discovery.proto persqueue_error_codes_v1.proto ydb_auth.proto + ydb_debug.proto ydb_persqueue_v1.proto ydb_persqueue_cluster_discovery.proto ydb_clickhouse_internal.proto diff --git a/ydb/public/api/protos/ydb_debug.proto b/ydb/public/api/protos/ydb_debug.proto new file mode 100644 index 000000000000..166a80b78b02 --- /dev/null +++ b/ydb/public/api/protos/ydb_debug.proto @@ -0,0 +1,39 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Debug; +option java_package = "com.yandex.ydb.debug"; +option java_outer_classname = "DebugProtos"; + +import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/ydb_status_codes.proto"; + +// just go to GRPC without the rest of YDB + +message PlainGrpcRequest { +} + +message PlainGrpcResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Go until GrpcProxy + +message GrpcProxyRequest { +} + +message GrpcProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Ping KQP proxy without executing anything + +message KqpProxyRequest { +} + +message KqpProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index bcdb56356382..4389042a6e5b 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -9,7 +9,9 @@ SRCS( query_workload.cpp ydb_admin.cpp ydb_benchmark.cpp + ydb_debug.cpp ydb_dynamic_config.cpp + ydb_ping.cpp ydb_profile.cpp ydb_root_common.cpp ydb_service_auth.cpp @@ -51,6 +53,7 @@ PEERDIR( ydb/public/lib/ydb_cli/topic ydb/public/sdk/cpp/client/draft ydb/public/sdk/cpp/client/ydb_coordination + ydb/public/sdk/cpp/client/ydb_debug ydb/public/sdk/cpp/client/ydb_export ydb/public/sdk/cpp/client/ydb_import ydb/public/sdk/cpp/client/ydb_monitoring @@ -63,6 +66,8 @@ PEERDIR( ydb/public/sdk/cpp/client/ydb_types/credentials/login ) +GENERATE_ENUM_SERIALIZATION(ydb_ping.h) + END() RECURSE( diff --git a/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp b/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp new file mode 100644 index 000000000000..a78e0226c1e0 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_debug.cpp @@ -0,0 +1,13 @@ +#include "ydb_debug.h" + +#include "ydb_ping.h" + +namespace NYdb::NConsoleClient { + +TCommandDebug::TCommandDebug() + : TClientCommandTree("debug", {}, "YDB cluster debug operations") +{ + AddCommand(std::make_unique()); +} + +} // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_debug.h b/ydb/public/lib/ydb_cli/commands/ydb_debug.h new file mode 100644 index 000000000000..aca53e9c2110 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_debug.h @@ -0,0 +1,12 @@ +#pragma once + +#include "ydb_command.h" + +namespace NYdb::NConsoleClient { + +class TCommandDebug : public TClientCommandTree { +public: + TCommandDebug(); +}; + +} // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp new file mode 100644 index 000000000000..fe8ebb3354f8 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp @@ -0,0 +1,195 @@ +#include "ydb_ping.h" + +#include +#include + +#include + +namespace NYdb::NConsoleClient { + +namespace { + +constexpr int DEFAULT_COUNT = 100; +constexpr int DEFAULT_INTERVAL_MS = 100; +constexpr TCommandPing::EPingType DEFAULT_PING_TYPE = TCommandPing::EPingType::Select1; + +const TVector PingTypeDescriptions = { + "ping returns right from the GRPC layer", + "ping goes through GRPC layer right to the GRPC proxy and returns", + "ping goes until query processing layer and returns without any query execution", + "ping executes a very simple 'SELECT 1;' query", +}; + +} // anonymous + +using namespace NKikimr::NOperationId; + +TCommandPing::TCommandPing() + : TYdbCommand("ping", {}, "ping YDB") + , Count(DEFAULT_COUNT) + , IntervalMs(DEFAULT_INTERVAL_MS) + , PingType(DEFAULT_PING_TYPE) +{} + +void TCommandPing::Config(TConfig& config) { + TYdbCommand::Config(config); + + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + TStringStream pingTypesDescription; + pingTypesDescription << "Ping types, available options:"; + for (size_t i = 0; i < PingTypeDescriptions.size(); ++i) { + EPingType type = static_cast(i); + + pingTypesDescription << "\n" << colors.ItalicOn() << type << "\n " << colors.ItalicOff() << PingTypeDescriptions[i]; + } + pingTypesDescription << "\nDefault: " << PingType << Endl; + + + config.Opts->AddLongOption( + 'c', "count", TStringBuilder() << "stop after replies, default " << DEFAULT_COUNT) + .RequiredArgument("COUNT").StoreResult(&Count); + + config.Opts->AddLongOption( + 'i', "interval", TStringBuilder() << " ms between pings, default " << DEFAULT_INTERVAL_MS) + .RequiredArgument("INTERVAL").StoreResult(&IntervalMs); + + config.Opts->AddLongOption('t', "type", pingTypesDescription.Str()) + .OptionalArgument("STRING").StoreResult(&PingType); +} + +void TCommandPing::Parse(TConfig& config) { + TClientCommand::Parse(config); +} + +int TCommandPing::Run(TConfig& config) { + return RunCommand(config); +} + +int TCommandPing::RunCommand(TConfig& config) { + TDriver driver = CreateDriver(config); + NQuery::TQueryClient queryClient(driver); + NDebug::TDebugClient pingClient(driver); + SetInterruptHandlers(); + + std::vector durations; + durations.reserve(Count); + size_t failedCount = 0; + + const TString query = "SELECT 1;"; + + for (int i = 0; i < Count && !IsInterrupted(); ++i) { + bool isOk; + auto start = NMonotonic::TMonotonic::Now(); + + switch (PingType) { + case EPingType::PlainGrpc: + isOk = PingPlainGrpc(pingClient); + break; + case EPingType::PlainKqp: + isOk = PingPlainKqp(pingClient); + break; + case EPingType::GrpcProxy: + isOk = PingGrpcProxy(pingClient); + break; + case EPingType::Select1: + isOk = PingKqpSelect1(queryClient, query); + break; + default: + std::cerr << "Unknown ping type" << std::endl; + return EXIT_FAILURE; + } + + auto end = NMonotonic::TMonotonic::Now(); + auto deltaUs = (end - start).MicroSeconds(); + + std::cout << i << (isOk ? " ok" : " failed") << " in " << deltaUs << " us" << std::endl; + + if (!isOk) { + ++failedCount; + } + + durations.push_back(deltaUs); + + Sleep(TDuration::MilliSeconds(IntervalMs)); + } + + std::sort(durations.begin(), durations.end()); + + std::cout << std::endl; + std::cout << "Total: " << durations.size() << ", failed: " << failedCount << std::endl; + const auto& percentiles = {0.5, 0.9, 0.95, 0.99}; + + for (double percentile: percentiles) { + size_t index = (size_t)(durations.size() * percentile); + std::cout << (int)(percentile * 100) << "%: " + << durations[index] << " us" << std::endl; + } + + return 0; +} + +bool TCommandPing::PingPlainGrpc(NDebug::TDebugClient& client) { + auto asyncResult = client.PlainGrpcPing(NDebug::TPlainGrpcPingSettings()); + asyncResult.GetValueSync(); + + return true; +} + +bool TCommandPing::PingPlainKqp(NDebug::TDebugClient& client) { + auto asyncResult = client.KqpProxyPing(NDebug::TKqpProxyPingSettings()); + auto result = asyncResult.GetValueSync(); + + if (result.IsSuccess()) { + return true; + } + + return false; +} + +bool TCommandPing::PingGrpcProxy(NDebug::TDebugClient& client) { + auto asyncResult = client.GrpcProxyPing(NDebug::TGrpcProxyPingSettings()); + auto result = asyncResult.GetValueSync(); + + if (result.IsSuccess()) { + return true; + } + + return false; +} + +bool TCommandPing::PingKqpSelect1(NQuery::TQueryClient& client, const TString& query) { + // Single stream execution + NQuery::TExecuteQuerySettings settings; + + // Execute query + settings.ExecMode(NQuery::EExecMode::Execute); + settings.StatsMode(NQuery::EStatsMode::None); + + settings.Syntax(NQuery::ESyntax::YqlV1); + + // Execute query without parameters + auto asyncResult = client.StreamExecuteQuery( + query, + NQuery::TTxControl::NoTx(), + settings + ); + + auto result = asyncResult.GetValueSync(); + while (!IsInterrupted()) { + auto streamPart = result.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + if (streamPart.EOS()) { + return false; + } + return false; + } + + if (streamPart.HasResultSet()) { + return true; + } + } + + return false; +} + +} // NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.h b/ydb/public/lib/ydb_cli/commands/ydb_ping.h new file mode 100644 index 000000000000..3f8fb6b546a6 --- /dev/null +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.h @@ -0,0 +1,58 @@ +#pragma once + +#pragma once + +#include "ydb_command.h" + +#include +#include + +namespace NYdb { + +namespace NQuery { + class TExecuteQueryIterator; + class TQueryClient; +} // namespace NQuery + +namespace NDebug { + class TDebugClient; +}; + +namespace NConsoleClient { + +class TCommandPing : public TYdbCommand, public TCommandWithFormat, + public TInterruptibleCommand +{ +public: + enum class EPingType { + PlainGrpc = 0, + GrpcProxy, + PlainKqp, + Select1, + }; + +public: + TCommandPing(); + virtual void Config(TConfig& config) override; + virtual void Parse(TConfig& config) override; + virtual int Run(TConfig& config) override; + +private: + int RunCommand(TConfig& config); + int PrintResponse(NQuery::TExecuteQueryIterator& result); + + bool PingPlainGrpc(NDebug::TDebugClient& client); + bool PingPlainKqp(NDebug::TDebugClient& client); + bool PingGrpcProxy(NDebug::TDebugClient& client); + + bool PingKqpSelect1(NQuery::TQueryClient& client, const TString& query); + +private: + int Count; + int IntervalMs; + + EPingType PingType; +}; + +} // NYdb::NConsoleClient +} // namespace NYdb diff --git a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp index b52fa0dd21d7..188b70d29d11 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_root_common.cpp @@ -1,6 +1,7 @@ #include "ydb_root_common.h" #include "ydb_profile.h" #include "ydb_admin.h" +#include "ydb_debug.h" #include "ydb_service_auth.h" #include "ydb_service_discovery.h" #include "ydb_service_export.h" @@ -52,6 +53,7 @@ TClientCommandRootCommon::TClientCommandRootCommon(const TString& name, const TC AddCommand(std::make_unique()); AddCommand(std::make_unique()); AddCommand(std::make_unique()); + AddCommand(std::make_unique()); } void TClientCommandRootCommon::ValidateSettings() { diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.cpp b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp new file mode 100644 index 000000000000..132ecd2a72bd --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp @@ -0,0 +1,94 @@ +#include "client.h" + +#include +#include +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include + +namespace NYdb::NDebug { + +using namespace NThreading; + +class TDebugClient::TImpl: public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + {} + + TAsyncPlainGrpcPingResult PlainGrpcPing(const TPlainGrpcPingSettings& settings) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (Ydb::Debug::PlainGrpcResponse*, TPlainStatus status) mutable { + TPlainGrpcPingResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Connections_->Run( + Ydb::Debug::PlainGrpcRequest(), + responseCb, + &Ydb::Debug::V1::DebugService::Stub::AsyncPingPlainGrpc, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + + TAsyncGrpcProxyPingResult GrpcProxyPing(const TGrpcProxyPingSettings& settings) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (Ydb::Debug::GrpcProxyResponse*, TPlainStatus status) mutable { + TGrpcProxyPingResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Connections_->Run( + Ydb::Debug::GrpcProxyRequest(), + responseCb, + &Ydb::Debug::V1::DebugService::Stub::AsyncPingGrpcProxy, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + + TAsyncKqpProxyPingResult KqpProxyPing(const TKqpProxyPingSettings& settings) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (Ydb::Debug::KqpProxyResponse*, TPlainStatus status) mutable { + TKqpProxyPingResult val(TStatus(std::move(status))); + pingPromise.SetValue(std::move(val)); + }; + + Connections_->Run( + Ydb::Debug::KqpProxyRequest(), + responseCb, + &Ydb::Debug::V1::DebugService::Stub::AsyncPingKqpProxy, + DbDriverState_, + TRpcRequestSettings::Make(settings)); + + return pingPromise; + } + + ~TImpl() = default; +}; + +TDebugClient::TDebugClient(const TDriver& driver, const TClientSettings& settings) + : Impl_(new TImpl(CreateInternalInterface(driver), settings)) +{ +} + +TAsyncPlainGrpcPingResult TDebugClient::PlainGrpcPing(const TPlainGrpcPingSettings& settings) { + return Impl_->PlainGrpcPing(settings); +} + +TAsyncGrpcProxyPingResult TDebugClient::GrpcProxyPing(const TGrpcProxyPingSettings& settings) { + return Impl_->GrpcProxyPing(settings); +} + +TAsyncKqpProxyPingResult TDebugClient::KqpProxyPing(const TKqpProxyPingSettings& settings) { + return Impl_->KqpProxyPing(settings); +} + +} // namespace NYdb::NDebug \ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.h b/ydb/public/sdk/cpp/client/ydb_debug/client.h new file mode 100644 index 000000000000..18ef51c213c2 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.h @@ -0,0 +1,61 @@ +#pragma once + +#include + +namespace NYdb::NDebug { + +//////////////////////////////////////////////////////////////////////////////// + +class TPlainGrpcPingResult: public TStatus { +public: + TPlainGrpcPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TGrpcProxyPingResult: public TStatus { +public: + TGrpcProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TKqpProxyPingResult: public TStatus { +public: + TKqpProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + + +//////////////////////////////////////////////////////////////////////////////// + +using TAsyncPlainGrpcPingResult = NThreading::TFuture; +using TAsyncGrpcProxyPingResult = NThreading::TFuture; +using TAsyncKqpProxyPingResult = NThreading::TFuture; + +//////////////////////////////////////////////////////////////////////////////// + +struct TPlainGrpcPingSettings : public TOperationRequestSettings {}; +struct TGrpcProxyPingSettings : public TOperationRequestSettings {}; +struct TKqpProxyPingSettings : public TOperationRequestSettings {}; + +struct TClientSettings : public TCommonClientSettingsBase { +}; + +class TDebugClient { +public: + using TAsyncPlainGrpcPingResult = TAsyncPlainGrpcPingResult; +public: + TDebugClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); + + TAsyncPlainGrpcPingResult PlainGrpcPing(const TPlainGrpcPingSettings& settings); + TAsyncGrpcProxyPingResult GrpcProxyPing(const TGrpcProxyPingSettings& settings); + TAsyncKqpProxyPingResult KqpProxyPing(const TKqpProxyPingSettings& settings); + +private: + class TImpl; + std::shared_ptr Impl_; +}; + +} // namespace NYdb::NDebug diff --git a/ydb/public/sdk/cpp/client/ydb_debug/ya.make b/ydb/public/sdk/cpp/client/ydb_debug/ya.make new file mode 100644 index 000000000000..adb981afed3a --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_debug/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + client.cpp +) + +PEERDIR( + ydb/public/api/protos + ydb/public/sdk/cpp/client/ydb_common_client/impl +) + +END() diff --git a/ydb/public/sdk/cpp/ya.make b/ydb/public/sdk/cpp/ya.make index 4e84a7f9d0ef..2e918f9994cf 100644 --- a/ydb/public/sdk/cpp/ya.make +++ b/ydb/public/sdk/cpp/ya.make @@ -20,6 +20,7 @@ RECURSE( client/ydb_coordination client/ydb_coordination/ut client/ydb_datastreams + client/ydb_debug client/ydb_discovery client/ydb_driver client/ydb_driver/ut diff --git a/ydb/services/ydb/ya.make b/ydb/services/ydb/ya.make index 7e8af11cb010..0dbb567a8992 100644 --- a/ydb/services/ydb/ya.make +++ b/ydb/services/ydb/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( ydb_clickhouse_internal.cpp + ydb_debug.cpp ydb_dummy.cpp ydb_export.cpp ydb_import.cpp diff --git a/ydb/services/ydb/ydb_debug.cpp b/ydb/services/ydb/ydb_debug.cpp new file mode 100644 index 000000000000..879fdd9a7d13 --- /dev/null +++ b/ydb/services/ydb/ydb_debug.cpp @@ -0,0 +1,76 @@ +#include "ydb_debug.h" + +#include +#include +#include +#include + +namespace NKikimr::NGRpcService { + +TGRpcYdbDebugService::TGRpcYdbDebugService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxyId, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +TGRpcYdbDebugService::TGRpcYdbDebugService(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TGrpcServiceBase(system, counters, proxies, rlAllowed) + , HandlersPerCompletionQueue(Max(size_t{1}, handlersPerCompletionQueue)) +{ +} + +void TGRpcYdbDebugService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { + using namespace Ydb::Debug; + + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + size_t proxyCounter = 0; + + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { + for (auto* cq: CQS) { + MakeIntrusive>(this, &Service_, cq, + [this](NYdbGrpc::IRequestContextBase* ctx) { + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); + PlainGrpcResponse response; + ctx->Reply(&response, 0); + }, &Ydb::Debug::V1::DebugService::AsyncService::RequestPingPlainGrpc, + "PingPlainGrpc", logger, getCounterBlock("ping", "PingPlainGrpc"))->Run(); + } + } + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, IN, OUT, CB, REQUEST_TYPE) \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + MakeIntrusive>(this, &Service_, cq, \ + [this, proxyCounter](NYdbGrpc::IRequestContextBase* ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcProxies_[proxyCounter % GRpcProxies_.size()], \ + new TGrpcRequestNoOperationCall \ + (ctx, &CB, TRequestAuxSettings { \ + .RlMode = RLSWITCH(TRateLimiterMode::Rps), \ + .RequestType = NJaegerTracing::ERequestType::PING_##REQUEST_TYPE, \ + })); \ + }, &Ydb::Debug::V1::DebugService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("query", #NAME))->Run(); \ + ++proxyCounter; \ + } \ + } + + ADD_REQUEST(PingGrpcProxy, GrpcProxyRequest, GrpcProxyResponse, DoGrpcProxyPing, PROXY); + ADD_REQUEST(PingKqpProxy, KqpProxyRequest, KqpProxyResponse, DoKqpPing, KQP); + +#undef ADD_REQUEST + +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/ydb/ydb_debug.h b/ydb/services/ydb/ydb_debug.h new file mode 100644 index 000000000000..7210bc763a8b --- /dev/null +++ b/ydb/services/ydb/ydb_debug.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace NKikimr::NGRpcService { + +class TGRpcYdbDebugService + : public TGrpcServiceBase +{ +public: + using TGrpcServiceBase::TGrpcServiceBase; + + TGRpcYdbDebugService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& proxyId, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + + TGRpcYdbDebugService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue); + +private: + void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; +}; + +} // namespace NKikimr::NGRpcService From 5d449ef6a4f203b2a56c85711f27e4543fb621d8 Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Wed, 20 Nov 2024 17:57:35 +0100 Subject: [PATCH 2/4] More debug pings and cleanup --- ydb/core/grpc_services/rpc_ping.cpp | 185 ++++++++++++++++-- ydb/core/grpc_services/service_debug.h | 2 + .../jaeger_tracing/request_discriminator.h | 2 + ydb/public/api/grpc/ydb_debug_v1.proto | 2 + ydb/public/api/protos/ydb_debug.proto | 20 ++ ydb/public/lib/ydb_cli/commands/ydb_ping.cpp | 39 +++- ydb/public/lib/ydb_cli/commands/ydb_ping.h | 4 + .../sdk/cpp/client/ydb_debug/client.cpp | 77 +++----- ydb/public/sdk/cpp/client/ydb_debug/client.h | 28 ++- ydb/services/ydb/ydb_debug.cpp | 2 + 10 files changed, 291 insertions(+), 70 deletions(-) diff --git a/ydb/core/grpc_services/rpc_ping.cpp b/ydb/core/grpc_services/rpc_ping.cpp index f9e81d8a43df..7198cd7d056d 100644 --- a/ydb/core/grpc_services/rpc_ping.cpp +++ b/ydb/core/grpc_services/rpc_ping.cpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include @@ -9,24 +11,28 @@ namespace NKikimr::NGRpcService { +using namespace Ydb; + namespace { using namespace NActors; -using TEvKqpProxyRequest = TGrpcRequestNoOperationCall; +//////////////////////////////////////////////////////////////////////////////// + +using TEvKqpProxyRequest = TGrpcRequestNoOperationCall; -class TExecutePingRPC : public TActorBootstrapped { +class TExecuteKqpPingRPC : public TActorBootstrapped { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::OTHER; } - TExecutePingRPC(TEvKqpProxyRequest* request) + TExecuteKqpPingRPC(TEvKqpProxyRequest* request) : Request_(request) {} void Bootstrap(const TActorContext &ctx) { - this->Become(&TExecutePingRPC::StateWork); + this->Become(&TThis::StateWork); Proceed(ctx); } @@ -48,51 +54,198 @@ class TExecutePingRPC : public TActorBootstrapped { LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to KQP proxy"); if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), new NKqp::TEvKqp::TEvProxyPingRequest())) { LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping"); - ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, ctx); + ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx); } } void Handle(NKqp::TEvKqp::TEvProxyPingResponse::TPtr&, const TActorContext& ctx) { LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response"); - ReplyWithResult(Ydb::StatusIds::SUCCESS, ctx); + ReplyWithResult(StatusIds::SUCCESS, ctx); } private: - void ReplyWithResult(Ydb::StatusIds::StatusCode status, const TActorContext &ctx) { + void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) { Request_->ReplyWithYdbStatus(status); Die(ctx); } void InternalError(const TString& message) { ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); - ReplyWithResult(Ydb::StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); } void UnexpectedEvent(const TString& state, TAutoPtr& ev) { - InternalError(TStringBuilder() << "TExecutePingRPC in state " << state << " received unexpected event " << - ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + InternalError(TStringBuilder() << "TExecuteKqpPingRPC in state " << state << " received unexpected event " + << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); } - private: std::shared_ptr Request_; }; +//////////////////////////////////////////////////////////////////////////////// + +using TEvSchemeCacheRequest = TGrpcRequestNoOperationCall; + +class TExecuteSchemeCachePingRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::OTHER; + } + + TExecuteSchemeCachePingRPC(TEvSchemeCacheRequest* request) + : Request_(request) + {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TThis::StateWork); + + Proceed(ctx); + } + +private: + void StateWork(TAutoPtr& ev) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + UnexpectedEvent(__func__, ev); + } + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + + void Proceed(const TActorContext &ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to SchemeCache"); + + auto* request = new TEvTxProxySchemeCache::TEvNavigateKeySet(new NSchemeCache::TSchemeCacheNavigate()); + if (!ctx.Send(MakeSchemeCacheID(), request)) { + LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to SchemeCache"); + ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx); + } + } + + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from SchemeCache"); + ReplyWithResult(StatusIds::SUCCESS, ctx); + } + +private: + void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) { + Request_->ReplyWithYdbStatus(status); + Die(ctx); + } + + void InternalError(const TString& message) { + ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + } + + void UnexpectedEvent(const TString& state, TAutoPtr& ev) { + InternalError(TStringBuilder() << "TExecuteSchemeCachePingRPC in state " << state << + " received unexpected event " << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + } + +private: + std::shared_ptr Request_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +using TEvTxProxyRequest = TGrpcRequestNoOperationCall; + +class TExecuteTxProxyPingRPC : public TActorBootstrapped { +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::OTHER; + } + + TExecuteTxProxyPingRPC(TEvTxProxyRequest* request) + : Request_(request) + {} + + void Bootstrap(const TActorContext &ctx) { + this->Become(&TThis::StateWork); + + Proceed(ctx); + } + +private: + void StateWork(TAutoPtr& ev) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); + default: + UnexpectedEvent(__func__, ev); + } + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + + void Proceed(const TActorContext &ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " sending ping to TxProxy"); + if (!ctx.Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId)) { + LOG_ERROR_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " failed to send ping to TxProxy"); + ReplyWithResult(StatusIds::INTERNAL_ERROR, ctx); + } + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr&, const TActorContext& ctx) { + LOG_TRACE_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " got ping response from TxProxy"); + ReplyWithResult(StatusIds::SUCCESS, ctx); + } + +private: + void ReplyWithResult(StatusIds::StatusCode status, const TActorContext &ctx) { + Request_->ReplyWithYdbStatus(status); + Die(ctx); + } + + void InternalError(const TString& message) { + ALOG_ERROR(NKikimrServices::RPC_REQUEST, "Internal error, message: " << message); + ReplyWithResult(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); + } + + void UnexpectedEvent(const TString& state, TAutoPtr& ev) { + InternalError(TStringBuilder() << "TExecuteTxProxyPingRPC in state " << state << " received unexpected event " + << ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); + } + +private: + std::shared_ptr Request_; +}; + } // anonymous +//////////////////////////////////////////////////////////////////////////////// + void DoGrpcProxyPing(std::unique_ptr p, const IFacilityProvider& f) { // we are in the GRPC proxy already (or in the check actor in case of auth check), // thus ready to reply right here - using TRequest = TGrpcRequestNoOperationCall; + using TRequest = TGrpcRequestNoOperationCall; TRequest* request = dynamic_cast(p.get()); - Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); - request->ReplyWithYdbStatus(Ydb::StatusIds::SUCCESS); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoGrpcProxyPing"); + request->ReplyWithYdbStatus(StatusIds::SUCCESS); } void DoKqpPing(std::unique_ptr p, const IFacilityProvider& f) { auto* request = dynamic_cast(p.release()); - Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); - f.RegisterActor(new TExecutePingRPC(request)); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoKqpPing"); + f.RegisterActor(new TExecuteKqpPingRPC(request)); +} + +void DoSchemeCachePing(std::unique_ptr p, const IFacilityProvider& f) { + auto* request = dynamic_cast(p.release()); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoSchemeCachePing"); + f.RegisterActor(new TExecuteSchemeCachePingRPC(request)); +} + +void DoTxProxyPing(std::unique_ptr p, const IFacilityProvider& f) { + auto* request = dynamic_cast(p.release()); + Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper in DoTxProxyPing"); + f.RegisterActor(new TExecuteTxProxyPingRPC(request)); } } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/service_debug.h b/ydb/core/grpc_services/service_debug.h index 486c6798db00..df73e870d829 100644 --- a/ydb/core/grpc_services/service_debug.h +++ b/ydb/core/grpc_services/service_debug.h @@ -10,5 +10,7 @@ class IFacilityProvider; void DoGrpcProxyPing(std::unique_ptr p, const IFacilityProvider& f); void DoKqpPing(std::unique_ptr p, const IFacilityProvider& f); +void DoSchemeCachePing(std::unique_ptr p, const IFacilityProvider& f); +void DoTxProxyPing(std::unique_ptr p, const IFacilityProvider& f); } // namespace NKikimr::NGRpcService diff --git a/ydb/core/jaeger_tracing/request_discriminator.h b/ydb/core/jaeger_tracing/request_discriminator.h index 2c20bb838706..ab007bde4eda 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.h +++ b/ydb/core/jaeger_tracing/request_discriminator.h @@ -73,6 +73,8 @@ enum class ERequestType: size_t { PING_GRPC, PING_PROXY, PING_KQP, + PING_SCHEME_CACHE, + PING_TX_PROXY, REQUEST_TYPES_CNT, // Add new types above this line }; diff --git a/ydb/public/api/grpc/ydb_debug_v1.proto b/ydb/public/api/grpc/ydb_debug_v1.proto index 26aa568b3a01..7d3b414338f5 100644 --- a/ydb/public/api/grpc/ydb_debug_v1.proto +++ b/ydb/public/api/grpc/ydb_debug_v1.proto @@ -9,4 +9,6 @@ service DebugService { rpc PingPlainGrpc(Debug.PlainGrpcRequest) returns (Debug.PlainGrpcResponse); rpc PingGrpcProxy(Debug.GrpcProxyRequest) returns (Debug.GrpcProxyResponse); rpc PingKqpProxy(Debug.KqpProxyRequest) returns (Debug.KqpProxyResponse); + rpc PingSchemeCache(Debug.SchemeCacheRequest) returns (Debug.SchemeCacheResponse); + rpc PingTxProxy(Debug.TxProxyRequest) returns (Debug.TxProxyResponse); } diff --git a/ydb/public/api/protos/ydb_debug.proto b/ydb/public/api/protos/ydb_debug.proto index 166a80b78b02..163d578e3496 100644 --- a/ydb/public/api/protos/ydb_debug.proto +++ b/ydb/public/api/protos/ydb_debug.proto @@ -37,3 +37,23 @@ message KqpProxyResponse { StatusIds.StatusCode status = 1; repeated Ydb.Issue.IssueMessage issues = 2; } + +// Ping SchemeCache + +message SchemeCacheRequest { +} + +message SchemeCacheResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} + +// Ping TxProxy + +message TxProxyRequest { +} + +message TxProxyResponse { + StatusIds.StatusCode status = 1; + repeated Ydb.Issue.IssueMessage issues = 2; +} diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp index fe8ebb3354f8..f142ba06e1b4 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp @@ -18,6 +18,8 @@ const TVector PingTypeDescriptions = { "ping goes through GRPC layer right to the GRPC proxy and returns", "ping goes until query processing layer and returns without any query execution", "ping executes a very simple 'SELECT 1;' query", + "ping goes through GRPC layer to SchemeCache and returns", + "ping goes through GRPC layer to TxProxy and allocates TxId", }; } // anonymous @@ -40,7 +42,8 @@ void TCommandPing::Config(TConfig& config) { for (size_t i = 0; i < PingTypeDescriptions.size(); ++i) { EPingType type = static_cast(i); - pingTypesDescription << "\n" << colors.ItalicOn() << type << "\n " << colors.ItalicOff() << PingTypeDescriptions[i]; + pingTypesDescription << "\n" << colors.ItalicOn() << type << colors.ItalicOff() + << "\n " << PingTypeDescriptions[i]; } pingTypesDescription << "\nDefault: " << PingType << Endl; @@ -94,6 +97,12 @@ int TCommandPing::RunCommand(TConfig& config) { case EPingType::Select1: isOk = PingKqpSelect1(queryClient, query); break; + case EPingType::SchemeCache: + isOk = PingSchemeCache(pingClient); + break; + case EPingType::TxProxy: + isOk = PingTxProxy(pingClient); + break; default: std::cerr << "Unknown ping type" << std::endl; return EXIT_FAILURE; @@ -129,14 +138,14 @@ int TCommandPing::RunCommand(TConfig& config) { } bool TCommandPing::PingPlainGrpc(NDebug::TDebugClient& client) { - auto asyncResult = client.PlainGrpcPing(NDebug::TPlainGrpcPingSettings()); + auto asyncResult = client.PingPlainGrpc(NDebug::TPlainGrpcPingSettings()); asyncResult.GetValueSync(); return true; } bool TCommandPing::PingPlainKqp(NDebug::TDebugClient& client) { - auto asyncResult = client.KqpProxyPing(NDebug::TKqpProxyPingSettings()); + auto asyncResult = client.PingKqpProxy(NDebug::TKqpProxyPingSettings()); auto result = asyncResult.GetValueSync(); if (result.IsSuccess()) { @@ -147,7 +156,29 @@ bool TCommandPing::PingPlainKqp(NDebug::TDebugClient& client) { } bool TCommandPing::PingGrpcProxy(NDebug::TDebugClient& client) { - auto asyncResult = client.GrpcProxyPing(NDebug::TGrpcProxyPingSettings()); + auto asyncResult = client.PingGrpcProxy(NDebug::TGrpcProxyPingSettings()); + auto result = asyncResult.GetValueSync(); + + if (result.IsSuccess()) { + return true; + } + + return false; +} + +bool TCommandPing::PingSchemeCache(NDebug::TDebugClient& client) { + auto asyncResult = client.PingSchemeCache(NDebug::TSchemeCachePingSettings()); + auto result = asyncResult.GetValueSync(); + + if (result.IsSuccess()) { + return true; + } + + return false; +} + +bool TCommandPing::PingTxProxy(NDebug::TDebugClient& client) { + auto asyncResult = client.PingTxProxy(NDebug::TTxProxyPingSettings()); auto result = asyncResult.GetValueSync(); if (result.IsSuccess()) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.h b/ydb/public/lib/ydb_cli/commands/ydb_ping.h index 3f8fb6b546a6..d728c1a4c971 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.h @@ -29,6 +29,8 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, GrpcProxy, PlainKqp, Select1, + SchemeCache, + TxProxy, }; public: @@ -44,6 +46,8 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, bool PingPlainGrpc(NDebug::TDebugClient& client); bool PingPlainKqp(NDebug::TDebugClient& client); bool PingGrpcProxy(NDebug::TDebugClient& client); + bool PingSchemeCache(NDebug::TDebugClient& client); + bool PingTxProxy(NDebug::TDebugClient& client); bool PingKqpSelect1(NQuery::TQueryClient& client, const TString& query); diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.cpp b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp index 132ecd2a72bd..0749645dd847 100644 --- a/ydb/public/sdk/cpp/client/ydb_debug/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.cpp @@ -12,6 +12,8 @@ namespace NYdb::NDebug { +using namespace Ydb; + using namespace NThreading; class TDebugClient::TImpl: public TClientImplCommon { @@ -20,51 +22,18 @@ class TDebugClient::TImpl: public TClientImplCommon { : TClientImplCommon(std::move(connections), settings) {} - TAsyncPlainGrpcPingResult PlainGrpcPing(const TPlainGrpcPingSettings& settings) { - auto pingPromise = NewPromise(); - auto responseCb = [pingPromise] (Ydb::Debug::PlainGrpcResponse*, TPlainStatus status) mutable { - TPlainGrpcPingResult val(TStatus(std::move(status))); - pingPromise.SetValue(std::move(val)); - }; - - Connections_->Run( - Ydb::Debug::PlainGrpcRequest(), - responseCb, - &Ydb::Debug::V1::DebugService::Stub::AsyncPingPlainGrpc, - DbDriverState_, - TRpcRequestSettings::Make(settings)); - - return pingPromise; - } - - TAsyncGrpcProxyPingResult GrpcProxyPing(const TGrpcProxyPingSettings& settings) { - auto pingPromise = NewPromise(); - auto responseCb = [pingPromise] (Ydb::Debug::GrpcProxyResponse*, TPlainStatus status) mutable { - TGrpcProxyPingResult val(TStatus(std::move(status))); - pingPromise.SetValue(std::move(val)); - }; - - Connections_->Run( - Ydb::Debug::GrpcProxyRequest(), - responseCb, - &Ydb::Debug::V1::DebugService::Stub::AsyncPingGrpcProxy, - DbDriverState_, - TRpcRequestSettings::Make(settings)); - - return pingPromise; - } - - TAsyncKqpProxyPingResult KqpProxyPing(const TKqpProxyPingSettings& settings) { - auto pingPromise = NewPromise(); - auto responseCb = [pingPromise] (Ydb::Debug::KqpProxyResponse*, TPlainStatus status) mutable { - TKqpProxyPingResult val(TStatus(std::move(status))); + template + auto Ping(const TSettings& settings, auto serviceMethod) { + auto pingPromise = NewPromise(); + auto responseCb = [pingPromise] (TResponse*, TPlainStatus status) mutable { + TResult val(TStatus(std::move(status))); pingPromise.SetValue(std::move(val)); }; - Connections_->Run( - Ydb::Debug::KqpProxyRequest(), + Connections_->Run( + TRequest(), responseCb, - &Ydb::Debug::V1::DebugService::Stub::AsyncPingKqpProxy, + serviceMethod, DbDriverState_, TRpcRequestSettings::Make(settings)); @@ -79,16 +48,30 @@ TDebugClient::TDebugClient(const TDriver& driver, const TClientSettings& setting { } -TAsyncPlainGrpcPingResult TDebugClient::PlainGrpcPing(const TPlainGrpcPingSettings& settings) { - return Impl_->PlainGrpcPing(settings); +TAsyncPlainGrpcPingResult TDebugClient::PingPlainGrpc(const TPlainGrpcPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingPlainGrpc); +} + +TAsyncGrpcProxyPingResult TDebugClient::PingGrpcProxy(const TGrpcProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingGrpcProxy); } -TAsyncGrpcProxyPingResult TDebugClient::GrpcProxyPing(const TGrpcProxyPingSettings& settings) { - return Impl_->GrpcProxyPing(settings); +TAsyncKqpProxyPingResult TDebugClient::PingKqpProxy(const TKqpProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingKqpProxy); +} + +TAsyncSchemeCachePingResult TDebugClient::PingSchemeCache(const TSchemeCachePingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingSchemeCache); + } -TAsyncKqpProxyPingResult TDebugClient::KqpProxyPing(const TKqpProxyPingSettings& settings) { - return Impl_->KqpProxyPing(settings); +TAsyncTxProxyPingResult TDebugClient::PingTxProxy(const TTxProxyPingSettings& settings) { + return Impl_->Ping( + settings, &Debug::V1::DebugService::Stub::AsyncPingTxProxy); } } // namespace NYdb::NDebug \ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_debug/client.h b/ydb/public/sdk/cpp/client/ydb_debug/client.h index 18ef51c213c2..9e0b4581a7a3 100644 --- a/ydb/public/sdk/cpp/client/ydb_debug/client.h +++ b/ydb/public/sdk/cpp/client/ydb_debug/client.h @@ -27,18 +27,37 @@ class TKqpProxyPingResult: public TStatus { {} }; +class TSchemeCachePingResult: public TStatus { +public: + TSchemeCachePingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; + +class TTxProxyPingResult: public TStatus { +public: + TTxProxyPingResult(TStatus&& status) + : TStatus(std::move(status)) + {} +}; //////////////////////////////////////////////////////////////////////////////// using TAsyncPlainGrpcPingResult = NThreading::TFuture; using TAsyncGrpcProxyPingResult = NThreading::TFuture; using TAsyncKqpProxyPingResult = NThreading::TFuture; +using TAsyncSchemeCachePingResult = NThreading::TFuture; +using TAsyncTxProxyPingResult = NThreading::TFuture; //////////////////////////////////////////////////////////////////////////////// struct TPlainGrpcPingSettings : public TOperationRequestSettings {}; struct TGrpcProxyPingSettings : public TOperationRequestSettings {}; struct TKqpProxyPingSettings : public TOperationRequestSettings {}; +struct TSchemeCachePingSettings : public TOperationRequestSettings {}; +struct TTxProxyPingSettings : public TOperationRequestSettings {}; + +//////////////////////////////////////////////////////////////////////////////// struct TClientSettings : public TCommonClientSettingsBase { }; @@ -49,9 +68,12 @@ class TDebugClient { public: TDebugClient(const TDriver& driver, const TClientSettings& settings = TClientSettings()); - TAsyncPlainGrpcPingResult PlainGrpcPing(const TPlainGrpcPingSettings& settings); - TAsyncGrpcProxyPingResult GrpcProxyPing(const TGrpcProxyPingSettings& settings); - TAsyncKqpProxyPingResult KqpProxyPing(const TKqpProxyPingSettings& settings); + TAsyncPlainGrpcPingResult PingPlainGrpc(const TPlainGrpcPingSettings& settings); + TAsyncGrpcProxyPingResult PingGrpcProxy(const TGrpcProxyPingSettings& settings); + TAsyncKqpProxyPingResult PingKqpProxy(const TKqpProxyPingSettings& settings); + + TAsyncSchemeCachePingResult PingSchemeCache(const TSchemeCachePingSettings& settings); + TAsyncTxProxyPingResult PingTxProxy(const TTxProxyPingSettings& settings); private: class TImpl; diff --git a/ydb/services/ydb/ydb_debug.cpp b/ydb/services/ydb/ydb_debug.cpp index 879fdd9a7d13..194f609b9ece 100644 --- a/ydb/services/ydb/ydb_debug.cpp +++ b/ydb/services/ydb/ydb_debug.cpp @@ -68,6 +68,8 @@ void TGRpcYdbDebugService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { ADD_REQUEST(PingGrpcProxy, GrpcProxyRequest, GrpcProxyResponse, DoGrpcProxyPing, PROXY); ADD_REQUEST(PingKqpProxy, KqpProxyRequest, KqpProxyResponse, DoKqpPing, KQP); + ADD_REQUEST(PingSchemeCache, SchemeCacheRequest, SchemeCacheResponse, DoSchemeCachePing, SCHEME_CACHE); + ADD_REQUEST(PingTxProxy, TxProxyRequest, TxProxyResponse, DoTxProxyPing, TX_PROXY); #undef ADD_REQUEST From 4d01438035df6e451041cf7c5db60d6b6af9694f Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Thu, 28 Nov 2024 14:45:01 +0100 Subject: [PATCH 3/4] rename ping type to kind --- ydb/public/lib/ydb_cli/commands/ydb_ping.cpp | 41 ++++++++++---------- ydb/public/lib/ydb_cli/commands/ydb_ping.h | 4 +- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp index f142ba06e1b4..81d24b93ac14 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.cpp @@ -11,9 +11,9 @@ namespace { constexpr int DEFAULT_COUNT = 100; constexpr int DEFAULT_INTERVAL_MS = 100; -constexpr TCommandPing::EPingType DEFAULT_PING_TYPE = TCommandPing::EPingType::Select1; +constexpr TCommandPing::EPingKind DEFAULT_PING_KIND = TCommandPing::EPingKind::Select1; -const TVector PingTypeDescriptions = { +const TVector PingKindDescriptions = { "ping returns right from the GRPC layer", "ping goes through GRPC layer right to the GRPC proxy and returns", "ping goes until query processing layer and returns without any query execution", @@ -30,22 +30,22 @@ TCommandPing::TCommandPing() : TYdbCommand("ping", {}, "ping YDB") , Count(DEFAULT_COUNT) , IntervalMs(DEFAULT_INTERVAL_MS) - , PingType(DEFAULT_PING_TYPE) + , PingKind(DEFAULT_PING_KIND) {} void TCommandPing::Config(TConfig& config) { TYdbCommand::Config(config); NColorizer::TColors colors = NColorizer::AutoColors(Cout); - TStringStream pingTypesDescription; - pingTypesDescription << "Ping types, available options:"; - for (size_t i = 0; i < PingTypeDescriptions.size(); ++i) { - EPingType type = static_cast(i); + TStringStream pingKindsDescription; + pingKindsDescription << "Ping kind, available options:"; + for (size_t i = 0; i < PingKindDescriptions.size(); ++i) { + EPingKind kind = static_cast(i); - pingTypesDescription << "\n" << colors.ItalicOn() << type << colors.ItalicOff() - << "\n " << PingTypeDescriptions[i]; + pingKindsDescription << "\n" << colors.ItalicOn() << kind << colors.ItalicOff() + << "\n " << PingKindDescriptions[i]; } - pingTypesDescription << "\nDefault: " << PingType << Endl; + pingKindsDescription << "\nDefault: " << PingKind << Endl; config.Opts->AddLongOption( @@ -56,8 +56,9 @@ void TCommandPing::Config(TConfig& config) { 'i', "interval", TStringBuilder() << " ms between pings, default " << DEFAULT_INTERVAL_MS) .RequiredArgument("INTERVAL").StoreResult(&IntervalMs); - config.Opts->AddLongOption('t', "type", pingTypesDescription.Str()) - .OptionalArgument("STRING").StoreResult(&PingType); + config.Opts->AddLongOption( + 'k', "kind", pingKindsDescription.Str()) + .OptionalArgument("STRING").StoreResult(&PingKind); } void TCommandPing::Parse(TConfig& config) { @@ -84,27 +85,27 @@ int TCommandPing::RunCommand(TConfig& config) { bool isOk; auto start = NMonotonic::TMonotonic::Now(); - switch (PingType) { - case EPingType::PlainGrpc: + switch (PingKind) { + case EPingKind::PlainGrpc: isOk = PingPlainGrpc(pingClient); break; - case EPingType::PlainKqp: + case EPingKind::PlainKqp: isOk = PingPlainKqp(pingClient); break; - case EPingType::GrpcProxy: + case EPingKind::GrpcProxy: isOk = PingGrpcProxy(pingClient); break; - case EPingType::Select1: + case EPingKind::Select1: isOk = PingKqpSelect1(queryClient, query); break; - case EPingType::SchemeCache: + case EPingKind::SchemeCache: isOk = PingSchemeCache(pingClient); break; - case EPingType::TxProxy: + case EPingKind::TxProxy: isOk = PingTxProxy(pingClient); break; default: - std::cerr << "Unknown ping type" << std::endl; + std::cerr << "Unknown ping kind" << std::endl; return EXIT_FAILURE; } diff --git a/ydb/public/lib/ydb_cli/commands/ydb_ping.h b/ydb/public/lib/ydb_cli/commands/ydb_ping.h index d728c1a4c971..557ee0c2ddd5 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_ping.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_ping.h @@ -24,7 +24,7 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, public TInterruptibleCommand { public: - enum class EPingType { + enum class EPingKind { PlainGrpc = 0, GrpcProxy, PlainKqp, @@ -55,7 +55,7 @@ class TCommandPing : public TYdbCommand, public TCommandWithFormat, int Count; int IntervalMs; - EPingType PingType; + EPingKind PingKind; }; } // NYdb::NConsoleClient From 348c573ecfd0224664511d81bca92faff3f4b2c4 Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Fri, 29 Nov 2024 16:47:52 +0100 Subject: [PATCH 4/4] separate ping group --- ydb/core/jaeger_tracing/request_discriminator.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/jaeger_tracing/request_discriminator.h b/ydb/core/jaeger_tracing/request_discriminator.h index ab007bde4eda..6cf9c6d89393 100644 --- a/ydb/core/jaeger_tracing/request_discriminator.h +++ b/ydb/core/jaeger_tracing/request_discriminator.h @@ -70,6 +70,7 @@ enum class ERequestType: size_t { BSCONFIG_REPLACESTORAGECONFIG, BSCONFIG_FETCHSTORAGECONFIG, + PING_GRPC, PING_PROXY, PING_KQP,