Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add leaky bucket to request reporting #11817

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,102 @@

namespace NKikimr {

static std::array<std::atomic<bool>, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions;
static std::array<std::atomic<bool>, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions;
struct TReportLeakBucket {
ui64 Level;
TInstant LastUpdate;
TMutex Lock;
};

static std::array<TReportLeakBucket, NKikimrBlobStorage::EPutHandleClass_MAX + 1> ReportPutPermissions;
static std::array<TReportLeakBucket, NKikimrBlobStorage::EGetHandleClass_MAX + 1> ReportGetPermissions;

bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass) {
return ReportPutPermissions[(ui32)handleClass].exchange(false);
auto& permission = ReportPutPermissions[(ui32)handleClass];
TGuard<TMutex> guard(permission.Lock);
auto level = permission.Level;
permission.Level = std::max((i64)0, (i64)level - 1);
return level > 0;
}

bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass) {
return ReportGetPermissions[(ui32)handleClass].exchange(false);
auto& permission = ReportGetPermissions[(ui32)handleClass];
TGuard<TMutex> guard(permission.Lock);
auto level = permission.Level;
permission.Level = std::max((i64)0, (i64)level - 1);
return level > 0;
}

class TRequestReportingThrottler : public TActorBootstrapped<TRequestReportingThrottler> {
public:
TRequestReportingThrottler(const TControlWrapper& reportingDelayMs)
: ReportingDelayMs(reportingDelayMs)
{}
TRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs)
: BucketSize(bucketSize)
, LeakDurationMs(leakDurationMs)
, LeakRate(leakRate)
, UpdatingDurationMs(updatingDurationMs)
{
for (auto& permission : ReportPutPermissions) {
permission.Level = BucketSize;
}
for (auto& permission : ReportGetPermissions) {
permission.Level = BucketSize;
}
}

void Bootstrap() {
void Bootstrap(const TActorContext &ctx) {
for (auto& permission : ReportPutPermissions) {
permission.LastUpdate = ctx.Now();
}
for (auto& permission : ReportGetPermissions) {
permission.LastUpdate = ctx.Now();
}
Become(&TThis::StateFunc);
HandleWakeup();
HandleWakeup(ctx);
}

STRICT_STFUNC(StateFunc,
cFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
CFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
)

private:
void HandleWakeup() {
for (std::atomic<bool>& permission : ReportPutPermissions) {
permission.store(true);
void Update(const TInstant& now, TInstant& lastUpdate, ui64& bucketLevel) {
ui64 bucketSize = BucketSize.Update(now);
ui64 leakRate = LeakRate.Update(now);
ui64 leakDurationMs = LeakDurationMs.Update(now);

ui64 msSinceLastUpdate = (now - lastUpdate).MilliSeconds();
ui64 intervalsCount = msSinceLastUpdate / leakDurationMs;
if (bucketLevel == bucketSize) {
lastUpdate = now;
return;
}
lastUpdate += TDuration::MilliSeconds(intervalsCount * leakDurationMs);
bucketLevel = std::min(bucketLevel + leakRate * intervalsCount, bucketSize);
}

void HandleWakeup(const TActorContext& ctx) {
TInstant now = ctx.Now();
for (auto& permission : ReportPutPermissions) {
TGuard<TMutex> guard(permission.Lock);
Update(now, permission.LastUpdate, permission.Level);
}
for (std::atomic<bool>& permission : ReportGetPermissions) {
permission.store(true);
for (auto& permission : ReportGetPermissions) {
TGuard<TMutex> guard(permission.Lock);
Update(now, permission.LastUpdate, permission.Level);
}
Schedule(TDuration::MilliSeconds(ReportingDelayMs.Update(TActivationContext::Now())), new TEvents::TEvWakeup);
Schedule(TDuration::MilliSeconds(UpdatingDurationMs.Update(now)), new TEvents::TEvWakeup);
}

private:
TMemorizableControlWrapper ReportingDelayMs;
TMemorizableControlWrapper BucketSize;
TMemorizableControlWrapper LeakDurationMs;
TMemorizableControlWrapper LeakRate;
TMemorizableControlWrapper UpdatingDurationMs;
};

IActor* CreateRequestReportingThrottler(const TControlWrapper& reportingDelayMs) {
return new TRequestReportingThrottler(reportingDelayMs);
IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs) {
return new TRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs);
}

} // namespace NKikimr
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
#include "defs.h"
#include <array>

#include <ydb/core/util/counted_leaky_bucket.h>

namespace NKikimr {

bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass);
bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass);

IActor* CreateRequestReportingThrottler(const TControlWrapper& longRequestReportingDelayMs);
IActor* CreateRequestReportingThrottler(const TControlWrapper& bucketSize, const TControlWrapper& leakDurationMs,
const TControlWrapper& leakRate, const TControlWrapper& updatingDurationMs);

} // namespace NKikimr
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void TestPutResultWithVDiskResults(TBlobStorageGroupType type, TMap<TVDiskID, NK
auto putActor = env.CreatePutRequestActor(ev);
runtime.Register(putActor.release());

auto reportActor = std::unique_ptr<IActor>(CreateRequestReportingThrottler(1));
auto reportActor = std::unique_ptr<IActor>(CreateRequestReportingThrottler(1, 60000, 1, 60000));
runtime.Register(reportActor.release());

for (ui64 idx = 0; idx < expectedVdiskRequests; ++idx) {
Expand Down
141 changes: 141 additions & 0 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_request_reporting_ut.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include "dsproxy_env_mock_ut.h"
#include "dsproxy_test_state_ut.h"

#include <ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h>

#include <ydb/core/testlib/basics/runtime.h>
#include <ydb/core/testlib/actor_helpers.h>
#include <ydb/core/testlib/basics/appdata.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/common/env.h>

namespace NKikimr {
namespace NDSProxyRequestReportningTest {

Y_UNIT_TEST_SUITE(TDSProxyRequestReportningTest) {

Y_UNIT_TEST(CheckDefaultBehaviour) {
TActorSystemStub actorSystemStub;
TTestBasicRuntime runtime;
SetupRuntime(runtime);

// allow 1 in 60 seconds
TControlWrapper bucketSize(1, 1, 100000);
TControlWrapper leakDurationMs(60000, 1, 3600000);
TControlWrapper leakRate(1, 1, 100000);
TControlWrapper updatingDurationMs(60000, 1, 3600000);
NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs));
runtime.EnableScheduleForActor(reportingThrottler);
runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// 10 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(10000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// 50 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(40000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 61 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(21000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::AsyncBlob));

// Check more than 1 request allowed for duration < 60000

// update: + 1 in bucket
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds before update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds after update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
}

Y_UNIT_TEST(CheckLeakyBucketBehaviour) {
TActorSystemStub actorSystemStub;
TTestBasicRuntime runtime;
SetupRuntime(runtime);

TControlWrapper bucketSize(3, 1, 100000);
TControlWrapper leakDurationMs(60000, 1, 3600000);
TControlWrapper leakRate(1, 1, 100000);
TControlWrapper updatingDurationMs(1000, 1, 3600000);
NActors::TActorId reportingThrottler = runtime.Register(CreateRequestReportingThrottler(bucketSize, leakDurationMs, leakRate, updatingDurationMs));
runtime.EnableScheduleForActor(reportingThrottler);
runtime.AdvanceCurrentTime(TDuration::MilliSeconds(10));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 61 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(61000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 121 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(121000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// 181 seconds after last update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(181000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// Check no more than 3 request allowed for duration < 60000

// update: + 3 in bucket
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 * 3));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds before update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(60000 - 1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
UNIT_ASSERT(AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));

// update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));

// 1 seconds after update
runtime.UpdateCurrentTime(runtime.GetCurrentTime() + TDuration::MilliSeconds(1000));
runtime.SimulateSleep(TDuration::MilliSeconds(1));
UNIT_ASSERT(!AllowToReport(NKikimrBlobStorage::EPutHandleClass::TabletLog));
}

} // Y_UNIT_TEST_SUITE TDSProxyRequestReportningTest
} // namespace NDSProxyRequestReportningTest
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ SRCS(
dsproxy_sequence_ut.cpp
dsproxy_patch_ut.cpp
dsproxy_counters_ut.cpp
dsproxy_request_reporting_ut.cpp
)

IF (BUILD_TYPE != "DEBUG")
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ TNodeWarden::TNodeWarden(const TIntrusivePtr<TNodeWardenConfig> &cfg)
, MaxNumOfSlowDisksHDD(DefaultMaxNumOfSlowDisks, 1, 2)
, MaxNumOfSlowDisksSSD(DefaultMaxNumOfSlowDisks, 1, 2)
, LongRequestThresholdMs(50'000, 1, 1'000'000)
, LongRequestReportingDelayMs(60'000, 1, 1'000'000)
, ReportingControllerBucketSize(1, 1, 100'000)
, ReportingControllerLeakDurationMs(60'000, 1, 3'600'000)
, ReportingControllerLeakRate(1, 1, 100'000)
, ReportingControllerUpdatingDurationMs(60'000, 1, 3'600'000)
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
Expand Down Expand Up @@ -280,7 +283,7 @@ void TNodeWarden::StopInvalidGroupProxy() {

void TNodeWarden::StartRequestReportingThrottler() {
STLOG(PRI_DEBUG, BS_NODE, NW27, "StartRequestReportingThrottler");
Register(CreateRequestReportingThrottler(LongRequestReportingDelayMs));
Register(CreateRequestReportingThrottler(ReportingControllerBucketSize, ReportingControllerLeakDurationMs, ReportingControllerLeakRate, ReportingControllerUpdatingDurationMs));
}

void TNodeWarden::PassAway() {
Expand Down Expand Up @@ -363,7 +366,10 @@ void TNodeWarden::Bootstrap() {
icb->RegisterSharedControl(MaxNumOfSlowDisksSSD, "DSProxyControls.MaxNumOfSlowDisksSSD");

icb->RegisterSharedControl(LongRequestThresholdMs, "DSProxyControls.LongRequestThresholdMs");
icb->RegisterSharedControl(LongRequestReportingDelayMs, "DSProxyControls.LongRequestReportingDelayMs");
icb->RegisterSharedControl(ReportingControllerBucketSize, "DSProxyControls.RequestReportingSettings.BucketSize");
icb->RegisterSharedControl(ReportingControllerLeakDurationMs, "DSProxyControls.RequestReportingSettings.LeakDurationMs");
icb->RegisterSharedControl(ReportingControllerLeakRate, "DSProxyControls.RequestReportingSettings.LeakRate");
icb->RegisterSharedControl(ReportingControllerUpdatingDurationMs, "DSProxyControls.RequestReportingSettings.UpdatingDurationMs");
}

// start replication broker
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ namespace NKikimr::NStorage {
TControlWrapper MaxNumOfSlowDisksSSD;

TControlWrapper LongRequestThresholdMs;
TControlWrapper LongRequestReportingDelayMs;
TControlWrapper ReportingControllerBucketSize;
TControlWrapper ReportingControllerLeakDurationMs;
TControlWrapper ReportingControllerLeakRate;
TControlWrapper ReportingControllerUpdatingDurationMs;

public:
struct TGroupRecord;
Expand Down
30 changes: 25 additions & 5 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,29 @@ message TImmediateControlsConfig {
}

message TDSProxyControls {
message TRequestReportingSettings {
optional uint64 BucketSize = 1 [(ControlOptions) = {
Description: "Capacity for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 100000,
DefaultValue: 1 }];
optional uint64 LeakDurationMs = 2 [(ControlOptions) = {
Description: "Leak interval (ms) for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 3600000,
DefaultValue: 60000 }];
optional uint64 LeakRate = 3 [(ControlOptions) = {
Description: "Leak size for Leaky Bucket algorithm",
MinValue: 1,
MaxValue: 100000,
DefaultValue: 1 }];
optional uint64 UpdatingDurationMs = 4 [(ControlOptions) = {
Description: "Interval (ms) for updating Leaky Bucket",
MinValue: 1,
MaxValue: 3600000,
DefaultValue: 60000 }];
}

optional uint64 SlowDiskThreshold = 1 [(ControlOptions) = {
Description: "The minimum ratio of slowest and second slowest disks, required to accelerate, promille",
MinValue: 1,
Expand All @@ -1394,11 +1417,7 @@ message TImmediateControlsConfig {
MinValue: 1,
MaxValue: 1000000,
DefaultValue: 50000 }];
optional uint64 LongRequestReportingDelayMs = 4 [(ControlOptions) = {
Description: "Delay (ms) between reports of long requests for one handle class",
MinValue: 1,
MaxValue: 1000000,
DefaultValue: 60000 }];
reserved 4;
optional uint64 MaxNumOfSlowDisks = 5 [(ControlOptions) = {
Description: "Maximum number of slow disks, which DSProxy can skip with Accelerations",
MinValue: 1,
Expand Down Expand Up @@ -1436,6 +1455,7 @@ message TImmediateControlsConfig {
MinValue: 1,
MaxValue: 2,
DefaultValue: 2 }];
optional TRequestReportingSettings RequestReportingSettings = 12;
}

message TPDiskControls {
Expand Down
Loading