Skip to content

Commit

Permalink
add leaky bucket to request reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
VPolka committed Nov 21, 2024
1 parent 59fa9fb commit 5e912fb
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 30 deletions.
80 changes: 61 additions & 19 deletions ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,91 @@

namespace NKikimr {

static std::array<std::atomic<bool>, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions;
static std::array<std::atomic<bool>, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions;
static std::array<std::atomic<ui64>, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions;
static std::array<std::atomic<ui64>, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions;

bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass) {
return ReportPutPermissions[(ui32)handleClass].exchange(false);
auto level = ReportPutPermissions[(ui32)handleClass].load();
return ReportPutPermissions[(ui32)handleClass].exchange(std::max((i64)0, (i64)level - 1)) > 0;
}

bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass) {
return ReportGetPermissions[(ui32)handleClass].exchange(false);
auto level = ReportPutPermissions[(ui32)handleClass].load();
return ReportGetPermissions[(ui32)handleClass].exchange(std::max((i64)0, (i64)level - 1)) > 0;
}

class TRequestReportingThrottler : public TActorBootstrapped<TRequestReportingThrottler> {
public:
TRequestReportingThrottler(const TControlWrapper& reportingDelayMs)
: ReportingDelayMs(reportingDelayMs)
{}
TRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs)
: BucketSize(bucketSize)
, LeakDurationMs(leakDurationMs)
, LeakRate(leakRate)
, UpdatingDurationMs(updatingDurationMs)
{
for (std::atomic<ui64>& permission : ReportPutPermissions) {
permission.store(BucketSize);
}
for (std::atomic<ui64>& permission : ReportGetPermissions) {
permission.store(BucketSize);
}
}

void Bootstrap() {
void Bootstrap(const TActorContext &ctx) {
for (ui32 i = 0; i < ReportPutPermissions.size(); ++i) {
PutLastUpdates[i] = ctx.Now();
}
for (ui32 i = 0; i < ReportGetPermissions.size(); ++i) {
GetLastUpdates[i] = ctx.Now();
}
Become(&TThis::StateFunc);
HandleWakeup();
HandleWakeup(ctx);
}

STRICT_STFUNC(StateFunc,
cFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
CFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
)

private:
void HandleWakeup() {
for (std::atomic<bool>& permission : ReportPutPermissions) {
permission.store(true);
void Update(const TInstant& now, TInstant& lastUpdate, std::atomic<ui64>& bucketLevel) {
ui64 bucketSize = BucketSize.Update(now);
ui64 leakRate = LeakRate.Update(now);
ui64 leakDurationMs = LeakDurationMs.Update(now);
ui64 level = bucketLevel.load();

ui64 msSinceLastUpdate = (now - lastUpdate).MilliSeconds();
ui64 intervalsCount = msSinceLastUpdate / leakDurationMs;

if (level < bucketSize) {
lastUpdate = now;
}
for (std::atomic<bool>& permission : ReportGetPermissions) {
permission.store(true);
bucketLevel.store(std::min(level + leakRate * intervalsCount, bucketSize));
}

void HandleWakeup(const TActorContext& ctx) {
TInstant now = ctx.Now();
for (size_t i = 0; i < ReportPutPermissions.size(); ++i) {
Update(now, PutLastUpdates[i], ReportPutPermissions[i]);
}
Schedule(TDuration::MilliSeconds(ReportingDelayMs.Update(TActivationContext::Now())), new TEvents::TEvWakeup);
for (size_t i = 0; i < ReportGetPermissions.size(); ++i) {
Update(now, GetLastUpdates[i], ReportGetPermissions[i]);
}
Schedule(TDuration::MilliSeconds(UpdatingDurationMs.Update(now)), new TEvents::TEvWakeup);
}

private:
TMemorizableControlWrapper ReportingDelayMs;
TMemorizableControlWrapper BucketSize;
TMemorizableControlWrapper LeakDurationMs;
TMemorizableControlWrapper LeakRate;
TMemorizableControlWrapper UpdatingDurationMs;

std::array<TInstant, NKikimrBlobStorage::EPutHandleClass_MAX + 1> PutLastUpdates;
std::array<TInstant, NKikimrBlobStorage::EGetHandleClass_MAX + 1> GetLastUpdates;
};

IActor* CreateRequestReportingThrottler(const TControlWrapper& reportingDelayMs) {
return new TRequestReportingThrottler(reportingDelayMs);
IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs) {
return new TRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs);
}

} // namespace NKikimr
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
#include "defs.h"
#include <array>

#include <ydb/core/util/counted_leaky_bucket.h>

namespace NKikimr {

bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass);
bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass);

IActor* CreateRequestReportingThrottler(const TControlWrapper& longRequestReportingDelayMs);
IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs);

} // namespace NKikimr
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void TestPutResultWithVDiskResults(TBlobStorageGroupType type, TMap<TVDiskID, NK
auto putActor = env.CreatePutRequestActor(ev);
runtime.Register(putActor.release());

auto reportActor = std::unique_ptr<IActor>(CreateRequestReportingThrottler(1));
auto reportActor = std::unique_ptr<IActor>(CreateRequestReportingThrottler(1, 60000, 1, 60000));
runtime.Register(reportActor.release());

for (ui64 idx = 0; idx < expectedVdiskRequests; ++idx) {
Expand Down
135 changes: 135 additions & 0 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "dsproxy_env_mock_ut.h"
#include "dsproxy_test_state_ut.h"

#include <ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h>

#include <ydb/core/testlib/basics/runtime.h>
#include <ydb/core/testlib/actor_helpers.h>
#include <ydb/core/testlib/basics/appdata.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/common/env.h>

namespace NKikimr {
namespace NDSProxyRequestReportningTest {

Y_UNIT_TEST_SUITE(TDSProxyRequestReportningTest) {

Y_UNIT_TEST(CheckDefaultBehaviour) {
TActorSystemStub actorSystemStub;
TTestBasicRuntime runtime;
SetupRuntime(runtime);

// allow 1 in 60 seconds
TControlWrapper bucketSize(1, 1, 100000);
TControlWrapper leakDurationMs(60000, 1, 3600000);
TControlWrapper leakRate(1, 1, 100000);
TControlWrapper updatingDurationMs(60000, 1, 3600000);
NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs));
runtime.EnableScheduleForActor(reportingThrottler);
runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// 10 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(10000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// 50 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(40000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 61 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(21000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// update: + 1 in bucket
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds before update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds after update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
}

Y_UNIT_TEST(CheckLeakyBucketBehaviour) {
TActorSystemStub actorSystemStub;
TTestBasicRuntime runtime;
SetupRuntime(runtime);

TControlWrapper bucketSize(3, 1, 100000);
TControlWrapper leakDurationMs(60000, 1, 3600000);
TControlWrapper leakRate(1, 1, 100000);
TControlWrapper updatingDurationMs(1000, 1, 3600000);
NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs));
runtime.EnableScheduleForActor(reportingThrottler);
runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 61 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(61000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 121 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(121000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 181 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(181000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// update: + 1 in bucket
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds before update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds after update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
}

} // Y_UNIT_TEST_SUITE TDSProxyRequestReportningTest
} // namespace NDSProxyRequestReportningTest
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SRCS(
dsproxy_sequence_ut.cpp
dsproxy_patch_ut.cpp
dsproxy_counters_ut.cpp
dsproxy_request_reporting_ut.cpp
)

IF (BUILD_TYPE != "DEBUG")
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr<TNodeWardenConfig> &cfg)
, MaxNumOfSlowDisksHDD(DefaultMaxNumOfSlowDisks, 1, 2)
, MaxNumOfSlowDisksSSD(DefaultMaxNumOfSlowDisks, 1, 2)
, LongRequestThresholdMs(50'000, 1, 1'000'000)
, LongRequestReportingDelayMs(60'000, 1, 1'000'000)
, ReportingControllerBucketSize(1, 1, 100'000)
, ReportingControllerLeakDurationMs(60'000, 1, 3'600'000)
, ReportingControllerLeakRate(1, 1, 100'000)
, ReportingControllerUpdatingDurationMs(60'000, 1, 3'600'000)
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
Expand Down Expand Up @@ -280,7 +283,7 @@ void TNodeWarden::StopInvalidGroupProxy() {

void TNodeWarden::StartRequestReportingThrottler() {
STLOG(PRI_DEBUG, BS_NODE, NW27, "StartRequestReportingThrottler");
Register(CreateRequestReportingThrottler(LongRequestReportingDelayMs));
Register(CreateRequestReportingThrottler(ReportingControllerBucketSize, ReportingControllerLeakDurationMs, ReportingControllerLeakRate, ReportingControllerUpdatingDurationMs));
}

void TNodeWarden::PassAway() {
Expand Down Expand Up @@ -363,7 +366,10 @@ void TNodeWarden::Bootstrap() {
icb->RegisterSharedControl(MaxNumOfSlowDisksSSD, "DSProxyControls.MaxNumOfSlowDisksSSD");

icb->RegisterSharedControl(LongRequestThresholdMs, "DSProxyControls.LongRequestThresholdMs");
icb->RegisterSharedControl(LongRequestReportingDelayMs, "DSProxyControls.LongRequestReportingDelayMs");
icb->RegisterSharedControl(ReportingControllerBucketSize, "DSProxyControls.RequestReportingSettings.BucketSize");
icb->RegisterSharedControl(ReportingControllerLeakDurationMs, "DSProxyControls.RequestReportingSettings.LeakDurationMs");
icb->RegisterSharedControl(ReportingControllerLeakRate, "DSProxyControls.RequestReportingSettings.LeakRate");
icb->RegisterSharedControl(ReportingControllerUpdatingDurationMs, "DSProxyControls.RequestReportingSettings.UpdatingDurationMs");
}

// start replication broker
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ namespace NKikimr::NStorage {
TControlWrapper MaxNumOfSlowDisksSSD;

TControlWrapper LongRequestThresholdMs;
TControlWrapper LongRequestReportingDelayMs;
TControlWrapper ReportingControllerBucketSize;
TControlWrapper ReportingControllerLeakDurationMs;
TControlWrapper ReportingControllerLeakRate;
TControlWrapper ReportingControllerUpdatingDurationMs;

public:
struct TGroupRecord;
Expand Down
30 changes: 25 additions & 5 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,29 @@ message TImmediateControlsConfig {
}

message TDSProxyControls {
message TRequestReportingSettings {
optional uint64 BucketSize = 1 [(ControlOptions) = {
Description: "Capacity for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 100000,
DefaultValue: 1 }];
optional uint64 LeakDurationMs = 2 [(ControlOptions) = {
Description: "Leak interval (ms) for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 3600000,
DefaultValue: 60000 }];
optional uint64 LeakRate = 3 [(ControlOptions) = {
Description: "Leak size for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 100000,
DefaultValue: 1 }];
optional uint64 UpdatingDurationMs = 4 [(ControlOptions) = {
Description: "Interval (ms) for updating Leaky Bucket",
MinValue: 1,
MaxValue: 3600000,
DefaultValue: 60000 }];
}

optional uint64 SlowDiskThreshold = 1 [(ControlOptions) = {
Description: "The minimum ratio of slowest and second slowest disks, required to accelerate, promille",
MinValue: 1,
Expand All @@ -1394,11 +1417,7 @@ message TImmediateControlsConfig {
MinValue: 1,
MaxValue: 1000000,
DefaultValue: 50000 }];
optional uint64 LongRequestReportingDelayMs = 4 [(ControlOptions) = {
Description: "Delay (ms) between reports of long requests for one handle class",
MinValue: 1,
MaxValue: 1000000,
DefaultValue: 60000 }];
reserved 4;
optional uint64 MaxNumOfSlowDisks = 5 [(ControlOptions) = {
Description: "Maximum number of slow disks, which DSProxy can skip with Accelerations",
MinValue: 1,
Expand Down Expand Up @@ -1436,6 +1455,7 @@ message TImmediateControlsConfig {
MinValue: 1,
MaxValue: 2,
DefaultValue: 2 }];
optional TRequestReportingSettings RequestReportingSettings = 12;
}

message TPDiskControls {
Expand Down

0 comments on commit 5e912fb

Please sign in to comment.