Skip to content

Commit

Permalink
duplicated code in tests (ydb-platform#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Mar 26, 2024
1 parent b2f7f35 commit d227d31
Showing 1 changed file with 67 additions and 115 deletions.
182 changes: 67 additions & 115 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void SendGetWriteInfo(ui32 internalPartitionId);
void ShadowPartitionCountersTest(bool isFirstClass);

void TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline);

TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;

Expand Down Expand Up @@ -1042,6 +1044,69 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) {

}

void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(quotaWaitDuration.MilliSeconds());
Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);

CreatePartition({
.Partition=TPartitionId{1},
.Begin=0, .End=10,
//
// partition configuration
//
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
},
//
// tablet configuration
//
{.Version=2, .Consumers={{.Consumer="client-1"}}});

SendSubDomainStatus(true);

ui64 cookie = 1;
ui64 messageNo = 0;

SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
UNIT_ASSERT(ownerEvent != nullptr);
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();

TAutoPtr<IEventHandle> handle;
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) {
return cookie == e.Cookie;
};

TString data = "data for write";

// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
messageNo++;

SendDiskStatusResponse();
{
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
}

// Second message will not be processed because the limit is exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
messageNo++;

SendDiskStatusResponse();
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event == nullptr);

// SudDomain quota available - second message will be processed..
SendSubDomainStatus(false);
SendDiskStatusResponse();

event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
}

Y_UNIT_TEST_F(Batching, TPartitionFixture)
{
CreatePartition();
Expand Down Expand Up @@ -1666,125 +1731,12 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)

Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
// disable write request expiration while thes wait quota
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(0);
Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);

CreatePartition({
.Partition=TPartitionId{1},
.Begin=0, .End=10,
//
// partition configuration
//
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
},
//
// tablet configuration
//
{.Version=2, .Consumers={{.Consumer="client-1"}}});

SendSubDomainStatus(true);

ui64 cookie = 1;
ui64 messageNo = 0;

SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
UNIT_ASSERT(ownerEvent != nullptr);
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();

TAutoPtr<IEventHandle> handle;
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) {
return cookie == e.Cookie;
};

TString data = "data for write";

// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
messageNo++;

SendDiskStatusResponse();
{
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
}

// Second message will not be processed because the limit is exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
messageNo++;

SendDiskStatusResponse();
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event == nullptr);

// SudDomain quota available - second message will be processed..
SendSubDomainStatus(false);
SendDiskStatusResponse();

event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(0), false);
}

Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture)
{
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);

CreatePartition({
.Partition=TPartitionId{1},
.Begin=0, .End=10,
//
// partition configuration
//
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
},
//
// tablet configuration
//
{.Version=2, .Consumers={{.Consumer="client-1"}}});

SendSubDomainStatus(true);

ui64 cookie = 1;
ui64 messageNo = 0;

SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
UNIT_ASSERT(ownerEvent != nullptr);
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();

TAutoPtr<IEventHandle> handle;
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; };

TString data = "data for write";

// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
messageNo++;
SendDiskStatusResponse();
{
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
}

// Second message will not be processed because the limit is exceeded.
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
messageNo++;

SendDiskStatusResponse();
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event == nullptr);

// SudDomain quota available - second message will be processed..
SendSubDomainStatus(false);
SendDiskStatusResponse();

event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
UNIT_ASSERT(event != nullptr);
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(300), true);
}

Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {
Expand Down

0 comments on commit d227d31

Please sign in to comment.