From a48b1893c9e422dbf27589a70c2f4fe5fa8404a8 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Mon, 1 Jul 2024 21:13:13 +0300 Subject: [PATCH] Replication stats (total & per item): lag, initial scan progress (#6092) --- ydb/core/grpc_services/rpc_replication.cpp | 13 +- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 31 +++ ydb/core/protos/replication.proto | 6 + .../tx/replication/controller/controller.cpp | 6 + .../replication/controller/controller_impl.h | 2 + .../replication/controller/private_events.cpp | 13 ++ .../replication/controller/private_events.h | 17 ++ .../tx/replication/controller/replication.h | 1 + .../tx/replication/controller/target_base.cpp | 6 +- .../tx/replication/controller/target_base.h | 1 + .../controller/tx_describe_replication.cpp | 218 +++++++++++++++++- ydb/library/services/services.proto | 1 + .../api/protos/draft/ydb_replication.proto | 10 +- .../ydb_cli/commands/ydb_service_scheme.cpp | 51 +++- .../sdk/cpp/client/draft/ydb_replication.cpp | 33 ++- .../sdk/cpp/client/draft/ydb_replication.h | 27 ++- ydb/public/sdk/cpp/client/ydb_table/table.cpp | 11 + ydb/public/sdk/cpp/client/ydb_table/table.h | 3 + 18 files changed, 416 insertions(+), 34 deletions(-) diff --git a/ydb/core/grpc_services/rpc_replication.cpp b/ydb/core/grpc_services/rpc_replication.cpp index 72021a4762e6..83c28cc04845 100644 --- a/ydb/core/grpc_services/rpc_replication.cpp +++ b/ydb/core/grpc_services/rpc_replication.cpp @@ -102,6 +102,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor(); PathIdFromPathId(pathId, ev->Record.MutablePathId()); + ev->Record.SetIncludeStats(GetProtoRequest()->include_stats()); NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release()); Become(&TDescribeReplicationRPC::StateDescribeReplication); @@ -167,6 +168,13 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActormutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( + from.GetLagMilliSeconds()); + } + if (from.HasInitialScanProgress()) { + to.mutable_stats()->set_initial_scan_progress(from.GetInitialScanProgress()); + } } static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) { @@ -174,9 +182,12 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActormutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( + *to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( from.GetStandBy().GetLagMilliSeconds()); } + if (from.GetStandBy().HasInitialScanProgress()) { + to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress()); + } break; case NKikimrReplication::TReplicationState::kError: *to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues()); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index e329feaf56f7..4db4ac5dd9fb 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -5485,7 +5485,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } Y_UNIT_TEST(CreateAsyncReplicationWithSecret) { + using namespace NReplication; + TKikimrRunner kikimr("root@builtin"); + auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root")); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -5529,6 +5532,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) { Sleep(TDuration::Seconds(1)); } + + while (true) { + auto settings = TDescribeReplicationSettings().IncludeStats(true); + const auto result = repl.DescribeReplication("/Root/replication", settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& desc = result.GetReplicationDescription(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Running); + + const auto& total = desc.GetRunningState().GetStats(); + if (!total.GetInitialScanProgress() || *total.GetInitialScanProgress() < 100) { + Sleep(TDuration::Seconds(1)); + continue; + } + + UNIT_ASSERT(total.GetInitialScanProgress()); + UNIT_ASSERT_DOUBLES_EQUAL(*total.GetInitialScanProgress(), 100.0, 0.01); + + const auto& items = desc.GetItems(); + UNIT_ASSERT_VALUES_EQUAL(items.size(), 1); + const auto& item = items.at(0).Stats; + + UNIT_ASSERT(item.GetInitialScanProgress()); + UNIT_ASSERT_DOUBLES_EQUAL(*item.GetInitialScanProgress(), *total.GetInitialScanProgress(), 0.01); + + // TODO: check lag too + break; + } } Y_UNIT_TEST(AlterAsyncReplication) { diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 6aa742156189..31b2e2eaba28 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -33,10 +33,14 @@ message TReplicationConfig { message TTargetSpecific { message TTarget { + // in/out optional string SrcPath = 1; optional string DstPath = 2; optional string SrcStreamName = 3; + // out optional uint64 Id = 4; + optional uint32 LagMilliSeconds = 5; + optional float InitialScanProgress = 6; // pencentage } repeated TTarget Targets = 1; @@ -59,6 +63,7 @@ message TReplicationConfig { message TReplicationState { message TStandBy { optional uint32 LagMilliSeconds = 1; + optional float InitialScanProgress = 2; // pencentage } message TPaused { @@ -147,6 +152,7 @@ message TEvDropReplicationResult { message TEvDescribeReplication { optional NKikimrProto.TPathID PathId = 1; + optional bool IncludeStats = 2; } message TEvDescribeReplicationResult { diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index e49503a3fe85..4f73d84d68b0 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -59,6 +59,7 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); HFunc(TEvPrivate::TEvProcessQueues, Handle); HFunc(TEvPrivate::TEvRemoveWorker, Handle); + HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvService::TEvStatus, Handle); @@ -132,6 +133,11 @@ void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const RunTxDescribeReplication(ev, ctx); } +void TController::Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDescribeReplication(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); RunTxDiscoveryTargetsResult(ev, ctx); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 8663690a246d..927d2d5bf528 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -82,6 +82,7 @@ class TController void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); @@ -128,6 +129,7 @@ class TController void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx); + void RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx); void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx); void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index 30ce9caabe43..f562331cc26d 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -163,6 +163,19 @@ TString TEvPrivate::TEvRemoveWorker::ToString() const { << " }"; } +TEvPrivate::TEvDescribeTargetsResult::TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result) + : Sender(sender) + , ReplicationId(rid) + , Result(std::move(result)) +{ +} + +TString TEvPrivate::TEvDescribeTargetsResult::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " ReplicationId: " << ReplicationId + << " }"; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 8eb3631dd7b5..7383d6f7ffc0 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -8,6 +9,10 @@ #include #include +#include + +#include + namespace NKikimr::NReplication::NController { struct TEvPrivate { @@ -25,6 +30,7 @@ struct TEvPrivate { EvResolveSecretResult, EvAlterDstResult, EvRemoveWorker, + EvDescribeTargetsResult, EvEnd, }; @@ -191,6 +197,17 @@ struct TEvPrivate { TString ToString() const override; }; + struct TEvDescribeTargetsResult: public TEventLocal { + using TResult = THashMap>; + + const TActorId Sender; + const ui64 ReplicationId; + TResult Result; + + explicit TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result); + TString ToString() const override; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 88ece4eed306..afdcfccebe16 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -79,6 +79,7 @@ class TReplication: public TSimpleRefCount { virtual void AddWorker(ui64 id) = 0; virtual void RemoveWorker(ui64 id) = 0; virtual void UpdateLag(ui64 workerId, TDuration lag) = 0; + virtual const TMaybe GetLag() const = 0; virtual void Progress(const TActorContext& ctx) = 0; virtual void Shutdown(const TActorContext& ctx) = 0; diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 590cdd393884..93b2c59a5b45 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -117,10 +117,14 @@ void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) { } if (TLagProvider::UpdateLag(it->second, workerId, lag)) { - Replication->UpdateLag(GetId(), GetLag().GetRef()); + Replication->UpdateLag(GetId(), TLagProvider::GetLag().GetRef()); } } +const TMaybe TTargetBase::GetLag() const { + return TLagProvider::GetLag(); +} + void TTargetBase::Progress(const TActorContext& ctx) { switch (DstState) { case EDstState::Creating: diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index d1b71710c2bc..4626433cd8d0 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -50,6 +50,7 @@ class TTargetBase void AddWorker(ui64 id) override; void RemoveWorker(ui64 id) override; void UpdateLag(ui64 workerId, TDuration lag) override; + const TMaybe GetLag() const override; void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; diff --git a/ydb/core/tx/replication/controller/tx_describe_replication.cpp b/ydb/core/tx/replication/controller/tx_describe_replication.cpp index 65b6518c737b..f78f1516445f 100644 --- a/ydb/core/tx/replication/controller/tx_describe_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_describe_replication.cpp @@ -1,17 +1,125 @@ #include "controller_impl.h" +#include "logging.h" +#include "private_events.h" -#include +#include +#include +#include +#include + +#include +#include namespace NKikimr::NReplication::NController { +class TTargetDescriber: public TActorBootstrapped { + void DescribeTarget(ui64 id) { + Y_ABORT_UNLESS(Targets.contains(id)); + Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(Targets.at(id), {}), 0, id); + } + + void Handle(TEvYdbProxy::TEvDescribeTableResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + if (!Targets.contains(ev->Cookie)) { + LOG_W("Unknown describe response" + << ": cookie# " << ev->Cookie); + return; + } + + const auto id = ev->Cookie; + const auto& path = Targets.at(id); + + if (Result.contains(id)) { + LOG_W("Duplicate describe response" + << ": id# " << id + << ", path# " << path); + return; + } + + auto& result = ev->Get()->Result; + if (result.IsSuccess()) { + LOG_D("Describe succeeded" + << ": id# " << id + << ", path# " << path); + Result.emplace(id, std::move(result)); + } else { + LOG_E("Describe failed" + << ": id# " << id + << ", path# " << path + << ", status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + Result.emplace(id, std::nullopt); + } + + if (Result.size() == Targets.size()) { + Send(Parent, new TEvPrivate::TEvDescribeTargetsResult(Sender, ReplicationId, std::move(Result))); + PassAway(); + } + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_TARGET_DESCRIBER; + } + + explicit TTargetDescriber(const TActorId& sender, const TActorId& parent, ui64 rid, const TActorId& proxy, THashMap&& targets) + : Sender(sender) + , Parent(parent) + , ReplicationId(rid) + , YdbProxy(proxy) + , Targets (std::move(targets)) + , LogPrefix("TargetDescriber", ReplicationId) + { + } + + void Bootstrap() { + for (const auto& [id, _] : Targets) { + DescribeTarget(id); + } + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvDescribeTableResponse, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Sender; + const TActorId Parent; + const ui64 ReplicationId; + const TActorId YdbProxy; + const THashMap Targets; + const TActorLogPrefix LogPrefix; + + TEvPrivate::TEvDescribeTargetsResult::TResult Result; + +}; // TTargetDescriber + class TController::TTxDescribeReplication: public TTxBase { - TEvController::TEvDescribeReplication::TPtr Ev; + const TActorId Sender; + TEvController::TEvDescribeReplication::TPtr PubEv; + TEvPrivate::TEvDescribeTargetsResult::TPtr PrivEv; + TReplication::TPtr Replication; THolder Result; + THashMap TargetsToDescribe; public: explicit TTxDescribeReplication(TController* self, TEvController::TEvDescribeReplication::TPtr& ev) : TTxBase("TxDescribeReplication", self) - , Ev(ev) + , Sender(ev->Sender) + , PubEv(ev) + { + } + + explicit TTxDescribeReplication(TController* self, TEvPrivate::TEvDescribeTargetsResult::TPtr& ev) + : TTxBase("TxDescribeReplication", self) + , Sender(ev->Get()->Sender) + , PrivEv(ev) { } @@ -19,23 +127,74 @@ class TController::TTxDescribeReplication: public TTxBase { return TXTYPE_DESCRIBE_REPLICATION; } - bool Execute(TTransactionContext&, const TActorContext& ctx) override { - CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + if (PubEv) { + return ExecutePub(txc, ctx); + } else if (PrivEv) { + return ExecutePriv(txc, ctx); + } else { + Y_ABORT("unreachable"); + } + } - const auto& record = Ev->Get()->Record; - Result = MakeHolder(); + bool ExecutePub(TTransactionContext&, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PubEv->Get()->ToString()); + const auto& record = PubEv->Get()->Record; const auto pathId = PathIdFromPathId(record.GetPathId()); - auto replication = Self->Find(pathId); - if (!replication) { + Replication = Self->Find(pathId); + if (!Replication) { + Result = MakeHolder(); + Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND); + return true; + } + + if (record.GetIncludeStats()) { + for (ui64 tid = 0; tid < Replication->GetNextTargetId(); ++tid) { + auto* target = Replication->FindTarget(tid); + if (!target) { + continue; + } + + TargetsToDescribe.emplace(tid, target->GetSrcPath()); + } + + if (TargetsToDescribe) { + return true; + } + } + + return DescribeReplication(Replication); + } + + bool ExecutePriv(TTransactionContext&, const TActorContext& ctx) { + CLOG_D(ctx, "Execute: " << PrivEv->Get()->ToString()); + + const auto rid = PrivEv->Get()->ReplicationId; + + Replication = Self->Find(rid); + if (!Replication) { + Result = MakeHolder(); Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND); return true; } + return DescribeReplication(Replication); + } + + bool DescribeReplication(TReplication::TPtr replication) { + Result = MakeHolder(); Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::SUCCESS); Result->Record.MutableConnectionParams()->CopyFrom(replication->GetConfig().GetSrcConnectionParams()); + using TInitialScanProgress = NYdb::NTable::TChangefeedDescription::TInitialScanProgress; + std::optional totalScanProgress; + + if (PrivEv) { + totalScanProgress = std::make_optional(); + } + for (ui64 tid = 0; tid < replication->GetNextTargetId(); ++tid) { auto* target = replication->FindTarget(tid); if (!target) { @@ -49,6 +208,34 @@ class TController::TTxDescribeReplication: public TTxBase { if (target->GetStreamName()) { item.SetSrcStreamName(target->GetStreamName()); } + if (const auto lag = target->GetLag()) { + item.SetLagMilliSeconds(lag->MilliSeconds()); + } + + if (PrivEv) { + const auto& result = PrivEv->Get()->Result; + + auto it = result.find(tid); + if (it == result.end() || !it->second) { + totalScanProgress.reset(); + continue; + } + + const auto& changefeeds = it->second->GetTableDescription().GetChangefeedDescriptions(); + auto* cfPtr = FindIfPtr(changefeeds, [target](const NYdb::NTable::TChangefeedDescription& cf) { + return cf.GetName() == target->GetStreamName(); + }); + + if (!cfPtr || !cfPtr->GetInitialScanProgress()) { + totalScanProgress.reset(); + continue; + } + + item.SetInitialScanProgress(cfPtr->GetInitialScanProgress()->GetProgress()); + if (totalScanProgress) { + *totalScanProgress += *cfPtr->GetInitialScanProgress(); + } + } } auto& state = *Result->Record.MutableState(); @@ -59,6 +246,9 @@ class TController::TTxDescribeReplication: public TTxBase { if (const auto lag = replication->GetLag()) { state.MutableStandBy()->SetLagMilliSeconds(lag->MilliSeconds()); } + if (totalScanProgress) { + state.MutableStandBy()->SetInitialScanProgress(totalScanProgress->GetProgress()); + } break; case TReplication::EState::Done: state.MutableDone(); @@ -78,7 +268,11 @@ class TController::TTxDescribeReplication: public TTxBase { CLOG_D(ctx, "Complete"); if (Result) { - ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + ctx.Send(Sender, Result.Release()); + } else if (TargetsToDescribe) { + Y_ABORT_UNLESS(Replication); + ctx.Register(new TTargetDescriber(Sender, ctx.SelfID, + Replication->GetId(), Replication->GetYdbProxy(), std::move(TargetsToDescribe))); } } @@ -88,4 +282,8 @@ void TController::RunTxDescribeReplication(TEvController::TEvDescribeReplication Execute(new TTxDescribeReplication(this, ev), ctx); } +void TController::RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDescribeReplication(this, ev), ctx); +} + } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 383df604b5f4..3ac7f03bda68 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1041,5 +1041,6 @@ message TActivity { BACKUP_LOCAL_PARTITION_READER = 639; STAT_SERVICE_HTTP_REQUEST = 640; BACKUP_PQ_OFFLOAD_ACTOR = 641; + REPLICATION_CONTROLLER_TARGET_DESCRIBER = 642; }; }; diff --git a/ydb/public/api/protos/draft/ydb_replication.proto b/ydb/public/api/protos/draft/ydb_replication.proto index 08a78ca58621..c1421a05e2ab 100644 --- a/ydb/public/api/protos/draft/ydb_replication.proto +++ b/ydb/public/api/protos/draft/ydb_replication.proto @@ -15,6 +15,8 @@ message DescribeReplicationRequest { Ydb.Operations.OperationParams operation_params = 1; // Replication path. string path = 2 [(required) = true]; + // Include statistics. + bool include_stats = 3; } message DescribeReplicationResponse { @@ -42,15 +44,21 @@ message ConnectionParams { } message DescribeReplicationResult { + message Stats { + optional google.protobuf.Duration lag = 1; + optional float initial_scan_progress = 2; + } + message Item { string source_path = 1; string destination_path = 2; optional string source_changefeed_name = 3; uint64 id = 4; + Stats stats = 5; } message RunningState { - optional google.protobuf.Duration lag = 1; + Stats stats = 1; } message ErrorState { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index 9c163bddfd82..d4eda58d6300 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -136,7 +136,7 @@ void TCommandDescribe::Config(TConfig& config) { // Table options config.Opts->AddLongOption("partition-boundaries", "[Table] Show partition key boundaries").StoreTrue(&ShowKeyShardBoundaries) .AddLongName("shard-boundaries"); - config.Opts->AddLongOption("stats", "[Table|Topic] Show table/topic statistics").StoreTrue(&ShowStats); + config.Opts->AddLongOption("stats", "[Table|Topic|Replication] Show table/topic/replication statistics").StoreTrue(&ShowStats); config.Opts->AddLongOption("partition-stats", "[Table|Topic] Show partition statistics").StoreTrue(&ShowPartitionStats); AddDeprecatedJsonOption(config, "(Deprecated, will be removed soon. Use --format option instead) [Table] Output in json format"); @@ -402,14 +402,34 @@ int TCommandDescribe::DescribeCoordinationNode(const TDriver& driver) { return PrintDescription(this, OutputFormat, desc, &TCommandDescribe::PrintCoordinationNodeResponsePretty); } +template +static TString ValueOr(const std::optional& value, const U& orValue) { + if (value) { + return TStringBuilder() << *value; + } else { + return TStringBuilder() << orValue; + } +} + +template +static TString ProgressOr(const std::optional& value, const U& orValue) { + if (value) { + return TStringBuilder() << FloatToString(*value, PREC_POINT_DIGITS, 2) << "%"; + } else { + return TStringBuilder() << orValue; + } +} + int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::TDescribeReplicationResult& result) const { const auto& desc = result.GetReplicationDescription(); Cout << Endl << "State: " << desc.GetState(); switch (desc.GetState()) { case NReplication::TReplicationDescription::EState::Running: - if (const auto& lag = desc.GetRunningState().GetLag()) { - Cout << Endl << "Lag: " << *lag; + if (ShowStats) { + const auto& stats = desc.GetRunningState().GetStats(); + Cout << Endl << "Lag: " << ValueOr(stats.GetLag(), "n/a"); + Cout << Endl << "Initial Scan progress: " << ProgressOr(stats.GetInitialScanProgress(), "n/a"); } break; case NReplication::TReplicationDescription::EState::Error: @@ -434,13 +454,24 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T } if (const auto& items = desc.GetItems()) { - TPrettyTable table({ "#", "Source", "Changefeed", "Destination" }, TPrettyTableConfig().WithoutRowDelimiters()); + TVector columnNames = { "#", "Source", "Destination", "Changefeed" }; + if (ShowStats) { + columnNames.push_back("Lag"); + columnNames.push_back("Progress"); + } + + TPrettyTable table(columnNames, TPrettyTableConfig().WithoutRowDelimiters()); for (const auto& item : items) { - table.AddRow() + auto& row = table.AddRow() .Column(0, item.Id) .Column(1, item.SrcPath) - .Column(2, item.SrcChangefeedName.value_or("n/a")) - .Column(3, item.DstPath); + .Column(2, item.DstPath) + .Column(3, ValueOr(item.SrcChangefeedName, "n/a")); + if (ShowStats) { + row + .Column(4, ValueOr(item.Stats.GetLag(), "n/a")) + .Column(5, ProgressOr(item.Stats.GetInitialScanProgress(), "n/a")); + } } Cout << Endl << "Items:" << Endl << table; } @@ -451,8 +482,12 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T int TCommandDescribe::DescribeReplication(const TDriver& driver) { NReplication::TReplicationClient client(driver); - auto result = client.DescribeReplication(Path).ExtractValueSync(); + auto settings = NReplication::TDescribeReplicationSettings() + .IncludeStats(ShowStats); + + auto result = client.DescribeReplication(Path, settings).ExtractValueSync(); ThrowOnError(result); + return PrintDescription(this, OutputFormat, result, &TCommandDescribe::PrintReplicationResponsePretty); } diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp index acfb0484e077..181a3f2d4171 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp @@ -59,15 +59,33 @@ const TOAuthCredentials& TConnectionParams::GetOAuthCredentials() const { return std::get(Credentials_); } -TRunningState::TRunningState(const std::optional& lag) - : Lag_(lag) +static TDuration DurationToDuration(const google::protobuf::Duration& value) { + return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); +} + +TStats::TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats) + : Lag_(stats.has_lag() ? std::make_optional(DurationToDuration(stats.lag())) : std::nullopt) + , InitialScanProgress_(stats.has_initial_scan_progress() ? std::make_optional(stats.initial_scan_progress()) : std::nullopt) { } -const std::optional& TRunningState::GetLag() const { +const std::optional& TStats::GetLag() const { return Lag_; } +const std::optional& TStats::GetInitialScanProgress() const { + return InitialScanProgress_; +} + +TRunningState::TRunningState(const TStats& stats) + : Stats_(stats) +{ +} + +const TStats& TRunningState::GetStats() const { + return Stats_; +} + class TErrorState::TImpl { public: NYql::TIssues Issues; @@ -87,10 +105,6 @@ const NYql::TIssues& TErrorState::GetIssues() const { return Impl_->Issues; } -TDuration DurationToDuration(const google::protobuf::Duration& value) { - return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); -} - template NYql::TIssues IssuesFromMessage(const ::google::protobuf::RepeatedPtrField& message) { NYql::TIssues issues; @@ -107,6 +121,7 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ .Id = item.id(), .SrcPath = item.source_path(), .DstPath = item.destination_path(), + .Stats = TStats(item.stats()), .SrcChangefeedName = item.has_source_changefeed_name() ? std::make_optional(item.source_changefeed_name()) : std::nullopt, }); @@ -114,8 +129,7 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ switch (desc.state_case()) { case Ydb::Replication::DescribeReplicationResult::kRunning: - State_ = TRunningState(desc.running().has_lag() - ? std::make_optional(DurationToDuration(desc.running().lag())) : std::nullopt); + State_ = TRunningState(desc.running().stats()); break; case Ydb::Replication::DescribeReplicationResult::kError: @@ -183,6 +197,7 @@ class TReplicationClient::TImpl: public TClientImplCommon(settings); request.set_path(path); + request.set_include_stats(settings.IncludeStats_); auto promise = NThreading::NewPromise(); diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.h b/ydb/public/sdk/cpp/client/draft/ydb_replication.h index a5b732dedac5..ee7db99f197b 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.h @@ -11,6 +11,7 @@ namespace Ydb::Replication { class ConnectionParams; class DescribeReplicationResult; + class DescribeReplicationResult_Stats; } namespace NYdb { @@ -25,7 +26,11 @@ namespace NYdb::NReplication { class TDescribeReplicationResult; using TAsyncDescribeReplicationResult = NThreading::TFuture; -struct TDescribeReplicationSettings: public TOperationRequestSettings {}; + +struct TDescribeReplicationSettings: public TOperationRequestSettings { + using TSelf = TDescribeReplicationSettings; + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); +}; struct TStaticCredentials { TString User; @@ -59,15 +64,28 @@ class TConnectionParams: private TCommonClientSettings { > Credentials_; }; -struct TRunningState { +class TStats { public: - TRunningState() = default; - explicit TRunningState(const std::optional& lag); + TStats() = default; + TStats(const Ydb::Replication::DescribeReplicationResult_Stats& stats); const std::optional& GetLag() const; + const std::optional& GetInitialScanProgress() const; private: std::optional Lag_; + std::optional InitialScanProgress_; +}; + +class TRunningState { +public: + TRunningState() = default; + explicit TRunningState(const TStats& stats); + + const TStats& GetStats() const; + +private: + TStats Stats_; }; struct TDoneState {}; @@ -90,6 +108,7 @@ class TReplicationDescription { ui64 Id; TString SrcPath; TString DstPath; + TStats Stats; std::optional SrcChangefeedName; }; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index dd5ea2ec3f5d..7067585f5663 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -2397,11 +2397,22 @@ TChangefeedDescription::TChangefeedDescription(const Ydb::Table::ChangefeedDescr : TChangefeedDescription(FromProto(proto)) {} +TChangefeedDescription::TInitialScanProgress::TInitialScanProgress() + : PartsTotal(0) + , PartsCompleted(0) +{} + TChangefeedDescription::TInitialScanProgress::TInitialScanProgress(ui32 total, ui32 completed) : PartsTotal(total) , PartsCompleted(completed) {} +TChangefeedDescription::TInitialScanProgress& TChangefeedDescription::TInitialScanProgress::operator+=(const TInitialScanProgress& other) { + PartsTotal += other.PartsTotal; + PartsCompleted += other.PartsCompleted; + return *this; +} + ui32 TChangefeedDescription::TInitialScanProgress::GetPartsTotal() const { return PartsTotal; } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index f5f5b0ac0b30..4e20f5259f38 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -273,8 +273,11 @@ class TChangefeedDescription { public: class TInitialScanProgress { public: + TInitialScanProgress(); explicit TInitialScanProgress(ui32 total, ui32 completed); + TInitialScanProgress& operator+=(const TInitialScanProgress& other); + ui32 GetPartsTotal() const; ui32 GetPartsCompleted() const; float GetProgress() const; // percentage