Skip to content

Commit

Permalink
Replication stats (total & per item): lag, initial scan progress (ydb…
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jul 1, 2024
1 parent 3a2566e commit a48b189
Show file tree
Hide file tree
Showing 18 changed files with 416 additions and 34 deletions.
13 changes: 12 additions & 1 deletion ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio

auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
PathIdFromPathId(pathId, ev->Record.MutablePathId());
ev->Record.SetIncludeStats(GetProtoRequest()->include_stats());

NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
Become(&TDescribeReplicationRPC::StateDescribeReplication);
Expand Down Expand Up @@ -167,16 +168,26 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
if (from.HasSrcStreamName()) {
to.set_source_changefeed_name(from.GetSrcStreamName());
}
if (from.HasLagMilliSeconds()) {
*to.mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
from.GetLagMilliSeconds());
}
if (from.HasInitialScanProgress()) {
to.mutable_stats()->set_initial_scan_progress(from.GetInitialScanProgress());
}
}

static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
if (from.GetStandBy().HasLagMilliSeconds()) {
*to.mutable_running()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
*to.mutable_running()->mutable_stats()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
from.GetStandBy().GetLagMilliSeconds());
}
if (from.GetStandBy().HasInitialScanProgress()) {
to.mutable_running()->mutable_stats()->set_initial_scan_progress(from.GetStandBy().GetInitialScanProgress());
}
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5485,7 +5485,10 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}

Y_UNIT_TEST(CreateAsyncReplicationWithSecret) {
using namespace NReplication;

TKikimrRunner kikimr("root@builtin");
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

Expand Down Expand Up @@ -5529,6 +5532,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

Sleep(TDuration::Seconds(1));
}

while (true) {
auto settings = TDescribeReplicationSettings().IncludeStats(true);
const auto result = repl.DescribeReplication("/Root/replication", settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& desc = result.GetReplicationDescription();
UNIT_ASSERT_VALUES_EQUAL(desc.GetState(), TReplicationDescription::EState::Running);

const auto& total = desc.GetRunningState().GetStats();
if (!total.GetInitialScanProgress() || *total.GetInitialScanProgress() < 100) {
Sleep(TDuration::Seconds(1));
continue;
}

UNIT_ASSERT(total.GetInitialScanProgress());
UNIT_ASSERT_DOUBLES_EQUAL(*total.GetInitialScanProgress(), 100.0, 0.01);

const auto& items = desc.GetItems();
UNIT_ASSERT_VALUES_EQUAL(items.size(), 1);
const auto& item = items.at(0).Stats;

UNIT_ASSERT(item.GetInitialScanProgress());
UNIT_ASSERT_DOUBLES_EQUAL(*item.GetInitialScanProgress(), *total.GetInitialScanProgress(), 0.01);

// TODO: check lag too
break;
}
}

Y_UNIT_TEST(AlterAsyncReplication) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ message TReplicationConfig {

message TTargetSpecific {
message TTarget {
// in/out
optional string SrcPath = 1;
optional string DstPath = 2;
optional string SrcStreamName = 3;
// out
optional uint64 Id = 4;
optional uint32 LagMilliSeconds = 5;
optional float InitialScanProgress = 6; // pencentage
}

repeated TTarget Targets = 1;
Expand All @@ -59,6 +63,7 @@ message TReplicationConfig {
message TReplicationState {
message TStandBy {
optional uint32 LagMilliSeconds = 1;
optional float InitialScanProgress = 2; // pencentage
}

message TPaused {
Expand Down Expand Up @@ -147,6 +152,7 @@ message TEvDropReplicationResult {

message TEvDescribeReplication {
optional NKikimrProto.TPathID PathId = 1;
optional bool IncludeStats = 2;
}

message TEvDescribeReplicationResult {
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle);
HFunc(TEvPrivate::TEvProcessQueues, Handle);
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
HFunc(TEvDiscovery::TEvError, Handle);
HFunc(TEvService::TEvStatus, Handle);
Expand Down Expand Up @@ -132,6 +133,11 @@ void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const
RunTxDescribeReplication(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDescribeReplication(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDiscoveryTargetsResult(ev, ctx);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TController
void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -128,6 +129,7 @@ class TController
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDescribeReplication(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/replication/controller/private_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ TString TEvPrivate::TEvRemoveWorker::ToString() const {
<< " }";
}

TEvPrivate::TEvDescribeTargetsResult::TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result)
: Sender(sender)
, ReplicationId(rid)
, Result(std::move(result))
{
}

TString TEvPrivate::TEvDescribeTargetsResult::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " ReplicationId: " << ReplicationId
<< " }";
}

}

Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) {
Expand Down
17 changes: 17 additions & 0 deletions ydb/core/tx/replication/controller/private_events.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#pragma once

#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>

#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
#include <ydb/core/scheme/scheme_pathid.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/core/tx/replication/common/worker_id.h>

#include <util/generic/hash.h>

#include <optional>

namespace NKikimr::NReplication::NController {

struct TEvPrivate {
Expand All @@ -25,6 +30,7 @@ struct TEvPrivate {
EvResolveSecretResult,
EvAlterDstResult,
EvRemoveWorker,
EvDescribeTargetsResult,

EvEnd,
};
Expand Down Expand Up @@ -191,6 +197,17 @@ struct TEvPrivate {
TString ToString() const override;
};

struct TEvDescribeTargetsResult: public TEventLocal<TEvDescribeTargetsResult, EvDescribeTargetsResult> {
using TResult = THashMap<ui64, std::optional<NYdb::NTable::TDescribeTableResult>>;

const TActorId Sender;
const ui64 ReplicationId;
TResult Result;

explicit TEvDescribeTargetsResult(const TActorId& sender, ui64 rid, TResult&& result);
TString ToString() const override;
};

}; // TEvPrivate

}
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
virtual void AddWorker(ui64 id) = 0;
virtual void RemoveWorker(ui64 id) = 0;
virtual void UpdateLag(ui64 workerId, TDuration lag) = 0;
virtual const TMaybe<TDuration> GetLag() const = 0;

virtual void Progress(const TActorContext& ctx) = 0;
virtual void Shutdown(const TActorContext& ctx) = 0;
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/replication/controller/target_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) {
}

if (TLagProvider::UpdateLag(it->second, workerId, lag)) {
Replication->UpdateLag(GetId(), GetLag().GetRef());
Replication->UpdateLag(GetId(), TLagProvider::GetLag().GetRef());
}
}

const TMaybe<TDuration> TTargetBase::GetLag() const {
return TLagProvider::GetLag();
}

void TTargetBase::Progress(const TActorContext& ctx) {
switch (DstState) {
case EDstState::Creating:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/target_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TTargetBase
void AddWorker(ui64 id) override;
void RemoveWorker(ui64 id) override;
void UpdateLag(ui64 workerId, TDuration lag) override;
const TMaybe<TDuration> GetLag() const override;

void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;
Expand Down
Loading

0 comments on commit a48b189

Please sign in to comment.