Skip to content

Commit

Permalink
ut
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Dec 4, 2024
1 parent f0d474c commit c9241c4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 5 deletions.
10 changes: 5 additions & 5 deletions ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ bool IsRoot(const TPartitionGraph::Node* node, const std::unordered_set<ui32>& p
if (node->IsRoot()) {
return true;
}
for (auto* p : node->Parents) {
for (auto* p : node->DirectParents) {
if (partitions.contains(p->Id)) {
return false;
}
Expand Down Expand Up @@ -966,10 +966,10 @@ bool TConsumer::IsReadable(ui32 partitionId) {
}

if (Partitions.empty()) {
return node->Parents.empty();
return node->DirectParents.empty();
}

for(auto* parent : node->HierarhicalParents) {
for(auto* parent : node->AllParents) {
if (!IsInactive(parent->Id)) {
return false;
}
Expand Down Expand Up @@ -1035,9 +1035,9 @@ bool TConsumer::ProccessReadingFinished(ui32 partitionId, bool wasInactive, cons
if (family->CanAttach(std::vector{id})) {
auto* node = GetPartitionGraph().GetPartition(id);
bool allParentsMerged = true;
if (node->Parents.size() > 1) {
if (node->DirectParents.size() > 1) {
// The partition was obtained as a result of the merge.
for (auto* c : node->Parents) {
for (auto* c : node->DirectParents) {
auto* other = FindFamily(c->Id);
if (!other) {
allParentsMerged = false;
Expand Down
107 changes: 107 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,113 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT(stats1->GetCommittedOffset() == 4);
}

Y_UNIT_TEST(PartitionSplit_DistributedTxCommit_CheckSessionReset) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(1)
.MaxActivePartitions(100)
.BeginConfigureAutoPartitioningSettings()
.UpUtilizationPercent(2)
.DownUtilizationPercent(1)
.StabilizationWindow(TDuration::Seconds(2))
.Strategy(EAutoPartitioningStrategy::ScaleUp)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings()
.BeginAddConsumer()
.ConsumerName(TEST_CONSUMER);

client.CreateTopic(TEST_TOPIC, createSettings).Wait();

auto msg = TString(1_MB, 'a');

auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false);
auto seqNo = 1;
{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

{
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_1->Write(Msg(msg, seqNo++)));
UNIT_ASSERT(writeSession_2->Write(Msg(msg, seqNo++)));
Sleep(TDuration::Seconds(5));
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
}

auto writeSession_3 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession_3->Write(Msg("message", seqNo++)));
UNIT_ASSERT(writeSession_3->Write(Msg("message", seqNo++)));

auto reader = client.CreateReadSession(
TReadSessionSettings()
.AutoPartitioningSupport(true)
.AppendTopics(TTopicReadSettings(TEST_TOPIC))
.ConsumerName(TEST_CONSUMER));

TInstant deadlineTime = TInstant::Now() + TDuration::Seconds(15);

auto commitSent = false;

while(deadlineTime > TInstant::Now()) {
for (auto event : reader->GetEvents(false)) {
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
auto& messages = x->GetMessages();
for (size_t i = 0u; i < messages.size(); ++i) {
auto& message = messages[i];
message.Commit();

if (message.GetSeqNo() == 8) {
if (!commitSent) {
commitSent = true;
Sleep(TDuration::MilliSeconds(300));
auto status = client.CommitOffset(TEST_TOPIC, 1, TEST_CONSUMER, 0).GetValueSync();
UNIT_ASSERT(status.IsSuccess());
} else {
return;
}
}
}
UNIT_ASSERT(writeSession_3->Write(Msg(TStringBuilder() << "message-" << seqNo, seqNo)));
seqNo++;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
x->Confirm();
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
} else if (auto* sessionClosedEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
Cerr << sessionClosedEvent->DebugString() << Endl << Flush;
} else {
Cerr << "SESSION EVENT unhandled \n";
}
}
Sleep(TDuration::MilliSeconds(250));
}

UNIT_ASSERT(false);
}

Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();
Expand Down

0 comments on commit c9241c4

Please sign in to comment.