From 5e912fb638d9a0776fadb9d150c1d2e4a78750ec Mon Sep 17 00:00:00 2001 From: Polina Volosnikova Date: Wed, 20 Nov 2024 19:04:32 +0000 Subject: [PATCH] add leaky bucket to request reporting --- .../dsproxy/dsproxy_request_reporting.cpp | 80 ++++++++--- .../dsproxy/dsproxy_request_reporting.h | 5 +- .../blobstorage/dsproxy/ut/dsproxy_put_ut.cpp | 2 +- .../ut/dsproxy_request_reporting_ut.cpp | 135 ++++++++++++++++++ ydb/core/blobstorage/dsproxy/ut/ya.make | 1 + .../nodewarden/node_warden_impl.cpp | 12 +- .../blobstorage/nodewarden/node_warden_impl.h | 5 +- ydb/core/protos/config.proto | 30 +++- 8 files changed, 240 insertions(+), 30 deletions(-) create mode 100644 ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp index d84fcec91bee..1854a4afa18d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp @@ -2,49 +2,91 @@ namespace NKikimr { -static std::array, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions; -static std::array, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions; +static std::array, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions; +static std::array, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions; bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass) { - return ReportPutPermissions[(ui32)handleClass].exchange(false); + auto level = ReportPutPermissions[(ui32)handleClass].load(); + return ReportPutPermissions[(ui32)handleClass].exchange(std::max((i64)0, (i64)level - 1)) > 0; } bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass) { - return ReportGetPermissions[(ui32)handleClass].exchange(false); + auto level = ReportPutPermissions[(ui32)handleClass].load(); + return ReportGetPermissions[(ui32)handleClass].exchange(std::max((i64)0, (i64)level - 1)) > 0; } class TRequestReportingThrottler : public TActorBootstrapped { public: - TRequestReportingThrottler(const TControlWrapper& reportingDelayMs) - : ReportingDelayMs(reportingDelayMs) - {} + TRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs, + const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs) + : BucketSize(bucketSize) + , LeakDurationMs(leakDurationMs) + , LeakRate(leakRate) + , UpdatingDurationMs(updatingDurationMs) + { + for (std::atomic& permission : ReportPutPermissions) { + permission.store(BucketSize); + } + for (std::atomic& permission : ReportGetPermissions) { + permission.store(BucketSize); + } + } - void Bootstrap() { + void Bootstrap(const TActorContext &ctx) { + for (ui32 i = 0; i < ReportPutPermissions.size(); ++i) { + PutLastUpdates[i] = ctx.Now(); + } + for (ui32 i = 0; i < ReportGetPermissions.size(); ++i) { + GetLastUpdates[i] = ctx.Now(); + } Become(&TThis::StateFunc); - HandleWakeup(); + HandleWakeup(ctx); } STRICT_STFUNC(StateFunc, - cFunc(TEvents::TEvWakeup::EventType, HandleWakeup); + CFunc(TEvents::TEvWakeup::EventType, HandleWakeup); ) private: - void HandleWakeup() { - for (std::atomic& permission : ReportPutPermissions) { - permission.store(true); + void Update(const TInstant& now, TInstant& lastUpdate, std::atomic& bucketLevel) { + ui64 bucketSize = BucketSize.Update(now); + ui64 leakRate = LeakRate.Update(now); + ui64 leakDurationMs = LeakDurationMs.Update(now); + ui64 level = bucketLevel.load(); + + ui64 msSinceLastUpdate = (now - lastUpdate).MilliSeconds(); + ui64 intervalsCount = msSinceLastUpdate / leakDurationMs; + + if (level < bucketSize) { + lastUpdate = now; } - for (std::atomic& permission : ReportGetPermissions) { - permission.store(true); + bucketLevel.store(std::min(level + leakRate * intervalsCount, bucketSize)); + } + + void HandleWakeup(const TActorContext& ctx) { + TInstant now = ctx.Now(); + for (size_t i = 0; i < ReportPutPermissions.size(); ++i) { + Update(now, PutLastUpdates[i], ReportPutPermissions[i]); } - Schedule(TDuration::MilliSeconds(ReportingDelayMs.Update(TActivationContext::Now())), new TEvents::TEvWakeup); + for (size_t i = 0; i < ReportGetPermissions.size(); ++i) { + Update(now, GetLastUpdates[i], ReportGetPermissions[i]); + } + Schedule(TDuration::MilliSeconds(UpdatingDurationMs.Update(now)), new TEvents::TEvWakeup); } private: - TMemorizableControlWrapper ReportingDelayMs; + TMemorizableControlWrapper BucketSize; + TMemorizableControlWrapper LeakDurationMs; + TMemorizableControlWrapper LeakRate; + TMemorizableControlWrapper UpdatingDurationMs; + + std::array PutLastUpdates; + std::array GetLastUpdates; }; -IActor* CreateRequestReportingThrottler(const TControlWrapper& reportingDelayMs) { - return new TRequestReportingThrottler(reportingDelayMs); +IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs, + const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs) { + return new TRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs); } } // namespace NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h b/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h index 58bc627b6ff2..d35622696852 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h @@ -3,11 +3,14 @@ #include "defs.h" #include +#include + namespace NKikimr { bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass); bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass); -IActor* CreateRequestReportingThrottler(const TControlWrapper& longRequestReportingDelayMs); +IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs, + const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs); } // namespace NKikimr diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp index 8b7c5459d97c..33889027fb25 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp @@ -415,7 +415,7 @@ void TestPutResultWithVDiskResults(TBlobStorageGroupType type, TMap(CreateRequestReportingThrottler(1)); + auto reportActor = std::unique_ptr(CreateRequestReportingThrottler(1, 60000, 1, 60000)); runtime.Register(reportActor.release()); for (ui64 idx = 0; idx < expectedVdiskRequests; ++idx) { diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp new file mode 100644 index 000000000000..816e7def21bd --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp @@ -0,0 +1,135 @@ +#include "dsproxy_env_mock_ut.h" +#include "dsproxy_test_state_ut.h" + +#include + +#include +#include +#include + +#include +#include +#include + +namespace NKikimr { +namespace NDSProxyRequestReportningTest { + +Y_UNIT_TEST_SUITE(TDSProxyRequestReportningTest) { + +Y_UNIT_TEST(CheckDefaultBehaviour) { + TActorSystemStub actorSystemStub; + TTestBasicRuntime runtime; + SetupRuntime(runtime); + + // allow 1 in 60 seconds + TControlWrapper bucketSize(1, 1, 100000); + TControlWrapper leakDurationMs(60000, 1, 3600000); + TControlWrapper leakRate(1, 1, 100000); + TControlWrapper updatingDurationMs(60000, 1, 3600000); + NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs)); + runtime.EnableScheduleForActor(reportingThrottler); + runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob)); + + // 10 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(10000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob)); + + // 50 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(40000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // 61 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(21000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob)); + + // update: + 1 in bucket + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // 1 seconds before update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // 1 seconds after update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); +} + +Y_UNIT_TEST(CheckLeakyBucketBehaviour) { + TActorSystemStub actorSystemStub; + TTestBasicRuntime runtime; + SetupRuntime(runtime); + + TControlWrapper bucketSize(3, 1, 100000); + TControlWrapper leakDurationMs(60000, 1, 3600000); + TControlWrapper leakRate(1, 1, 100000); + TControlWrapper updatingDurationMs(1000, 1, 3600000); + NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs)); + runtime.EnableScheduleForActor(reportingThrottler); + runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // 61 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(61000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // 121 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(121000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // 181 seconds after last update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(181000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // update: + 1 in bucket + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // 1 seconds before update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); + + // update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + + // 1 seconds after update + runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000)); + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog)); +} + +} // Y_UNIT_TEST_SUITE TDSProxyRequestReportningTest +} // namespace NDSProxyRequestReportningTest +} // namespace NKikimr diff --git a/ydb/core/blobstorage/dsproxy/ut/ya.make b/ydb/core/blobstorage/dsproxy/ut/ya.make index 830f4832a451..b2782435bcf1 100644 --- a/ydb/core/blobstorage/dsproxy/ut/ya.make +++ b/ydb/core/blobstorage/dsproxy/ut/ya.make @@ -27,6 +27,7 @@ SRCS( dsproxy_sequence_ut.cpp dsproxy_patch_ut.cpp dsproxy_counters_ut.cpp + dsproxy_request_reporting_ut.cpp ) IF (BUILD_TYPE != "DEBUG") diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index e1adacc552c5..11c7f634847b 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -47,7 +47,10 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr &cfg) , MaxNumOfSlowDisksHDD(DefaultMaxNumOfSlowDisks, 1, 2) , MaxNumOfSlowDisksSSD(DefaultMaxNumOfSlowDisks, 1, 2) , LongRequestThresholdMs(50'000, 1, 1'000'000) - , LongRequestReportingDelayMs(60'000, 1, 1'000'000) + , ReportingControllerBucketSize(1, 1, 100'000) + , ReportingControllerLeakDurationMs(60'000, 1, 3'600'000) + , ReportingControllerLeakRate(1, 1, 100'000) + , ReportingControllerUpdatingDurationMs(60'000, 1, 3'600'000) { Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1); AvailDomainId = 1; @@ -280,7 +283,7 @@ void TNodeWarden::StopInvalidGroupProxy() { void TNodeWarden::StartRequestReportingThrottler() { STLOG(PRI_DEBUG, BS_NODE, NW27, "StartRequestReportingThrottler"); - Register(CreateRequestReportingThrottler(LongRequestReportingDelayMs)); + Register(CreateRequestReportingThrottler(ReportingControllerBucketSize, ReportingControllerLeakDurationMs, ReportingControllerLeakRate, ReportingControllerUpdatingDurationMs)); } void TNodeWarden::PassAway() { @@ -363,7 +366,10 @@ void TNodeWarden::Bootstrap() { icb->RegisterSharedControl(MaxNumOfSlowDisksSSD, "DSProxyControls.MaxNumOfSlowDisksSSD"); icb->RegisterSharedControl(LongRequestThresholdMs, "DSProxyControls.LongRequestThresholdMs"); - icb->RegisterSharedControl(LongRequestReportingDelayMs, "DSProxyControls.LongRequestReportingDelayMs"); + icb->RegisterSharedControl(ReportingControllerBucketSize, "DSProxyControls.RequestReportingSettings.BucketSize"); + icb->RegisterSharedControl(ReportingControllerLeakDurationMs, "DSProxyControls.RequestReportingSettings.LeakDurationMs"); + icb->RegisterSharedControl(ReportingControllerLeakRate, "DSProxyControls.RequestReportingSettings.LeakRate"); + icb->RegisterSharedControl(ReportingControllerUpdatingDurationMs, "DSProxyControls.RequestReportingSettings.UpdatingDurationMs"); } // start replication broker diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 02f6e41dba23..292836cd685d 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -191,7 +191,10 @@ namespace NKikimr::NStorage { TControlWrapper MaxNumOfSlowDisksSSD; TControlWrapper LongRequestThresholdMs; - TControlWrapper LongRequestReportingDelayMs; + TControlWrapper ReportingControllerBucketSize; + TControlWrapper ReportingControllerLeakDurationMs; + TControlWrapper ReportingControllerLeakRate; + TControlWrapper ReportingControllerUpdatingDurationMs; public: struct TGroupRecord; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index f3799a244b94..0e927999d56b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1379,6 +1379,29 @@ message TImmediateControlsConfig { } message TDSProxyControls { + message TRequestReportingSettings { + optional uint64 BucketSize = 1 [(ControlOptions) = { + Description: "Capacity for Leaky Bucket algorithm", + MinValue: 1, + MaxValue: 100000, + DefaultValue: 1 }]; + optional uint64 LeakDurationMs = 2 [(ControlOptions) = { + Description: "Leak interval (ms) for Leaky Bucket algorithm", + MinValue: 1, + MaxValue: 3600000, + DefaultValue: 60000 }]; + optional uint64 LeakRate = 3 [(ControlOptions) = { + Description: "Leak size for Leaky Bucket algorithm", + MinValue: 1, + MaxValue: 100000, + DefaultValue: 1 }]; + optional uint64 UpdatingDurationMs = 4 [(ControlOptions) = { + Description: "Interval (ms) for updating Leaky Bucket", + MinValue: 1, + MaxValue: 3600000, + DefaultValue: 60000 }]; + } + optional uint64 SlowDiskThreshold = 1 [(ControlOptions) = { Description: "The minimum ratio of slowest and second slowest disks, required to accelerate, promille", MinValue: 1, @@ -1394,11 +1417,7 @@ message TImmediateControlsConfig { MinValue: 1, MaxValue: 1000000, DefaultValue: 50000 }]; - optional uint64 LongRequestReportingDelayMs = 4 [(ControlOptions) = { - Description: "Delay (ms) between reports of long requests for one handle class", - MinValue: 1, - MaxValue: 1000000, - DefaultValue: 60000 }]; + reserved 4; optional uint64 MaxNumOfSlowDisks = 5 [(ControlOptions) = { Description: "Maximum number of slow disks, which DSProxy can skip with Accelerations", MinValue: 1, @@ -1436,6 +1455,7 @@ message TImmediateControlsConfig { MinValue: 1, MaxValue: 2, DefaultValue: 2 }]; + optional TRequestReportingSettings RequestReportingSettings = 12; } message TPDiskControls {