Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization of the algorithm for determining the end of partition processing #12081

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,12 @@ namespace TEvPersQueue {
struct TEvReadingPartitionFinishedRequest : public TEventPB<TEvReadingPartitionFinishedRequest, NKikimrPQ::TEvReadingPartitionFinishedRequest, EvReadingPartitionFinished> {
TEvReadingPartitionFinishedRequest() = default;

TEvReadingPartitionFinishedRequest(const TString& consumer, ui32 partitionId, bool scaleAwareSDK, bool startedReadingFromEndOffset) {
TEvReadingPartitionFinishedRequest(const TString& consumer, ui32 partitionId, bool scaleAwareSDK, bool startedReadingFromEndOffset, ui64 readTimestampMs = 0) {
Record.SetConsumer(consumer);
Record.SetPartitionId(partitionId);
Record.SetScaleAwareSDK(scaleAwareSDK);
Record.SetStartedReadingFromEndOffset(startedReadingFromEndOffset);
Record.SetReadTimestampMs(readTimestampMs);
}
};

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,11 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext

}

result.SetPartitionStatus(PartitionConfig->GetStatus());

result.SetStartOffset(StartOffset);
result.SetEndOffset(EndOffset);
result.SetEndWriteTimestampMs(EndWriteTimestamp.MilliSeconds());

if (filterConsumers) {
for (TString consumer : requiredConsumers) {
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,9 +565,17 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c

for (const auto& partRes : record.GetPartResult()) {
ui32 partitionId = partRes.GetPartition();
if (!PartitionsInfo.contains(partitionId)) {

auto p = PartitionsInfo.find(partitionId);
if (p == PartitionsInfo.end()) {
continue;
}
auto& partitionInfo = p->second;

if (partRes.HasPartitionStatus() && partRes.HasEndWriteTimestampMs()) {
partitionInfo.Status = partRes.GetPartitionStatus();
partitionInfo.EndWriteTimestamp = TInstant::MilliSeconds(partRes.GetEndWriteTimestampMs());
}

if (SplitMergeEnabled(TabletConfig) && PartitionsScaleManager) {
PartitionsScaleManager->HandleScaleStatusChange(partitionId, partRes.GetScaleStatus(), ctx);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
public:
struct TPartitionInfo {
ui64 TabletId;
NKikimrPQ::ETopicPartitionStatus Status = NKikimrPQ::ETopicPartitionStatus::Active;
TInstant EndWriteTimestamp;
};

private:
Expand Down
18 changes: 17 additions & 1 deletion ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,23 @@ void TConsumer::FinishReading(TEvPersQueue::TEvReadingPartitionFinishedRequest::
ScheduleBalance(ctx);
}
} else if (!partition.IsInactive()) {
auto delay = std::min<size_t>(1ul << partition.Iteration, Balancer.GetLifetimeSeconds()); // TODO use split/merge time
auto now = TInstant::Now();

auto* partitionInfo = GetPartitionInfo(partitionId);
bool hasEndWriteTimestamp = partitionInfo && partitionInfo->Status == NKikimrPQ::ETopicPartitionStatus::Inactive;
auto endWriteTimestamp = hasEndWriteTimestamp ? partitionInfo->EndWriteTimestamp : now;

size_t delay;
if (r.GetReadTimestampMs() && hasEndWriteTimestamp) {
TInstant readTimestamp = TInstant::MilliSeconds(r.GetReadTimestampMs());
if (readTimestamp >= endWriteTimestamp) {
delay = 1;
} else {
delay = (endWriteTimestamp - readTimestamp).Seconds() + 1;
}
} else {
delay = std::min<size_t>(1ul << partition.Iteration, Balancer.GetLifetimeSeconds() - (now - endWriteTimestamp).Seconds() + 1);
}

PQ_LOG_D("Reading of the partition " << partitionId << " was finished by " << r.GetConsumer()
<< ". Scheduled release of the partition for re-reading. Delay=" << delay << " seconds,"
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,8 @@ message TStatusResponse {
optional uint64 Cookie = 34;

optional EScaleStatus ScaleStatus = 35;
optional ETopicPartitionStatus PartitionStatus = 36;
optional uint64 EndWriteTimestampMs = 37;
}

message TConsumerResult {
Expand Down Expand Up @@ -1072,6 +1074,7 @@ message TEvReadingPartitionFinishedRequest {
optional uint32 PartitionId = 2;
optional bool ScaleAwareSDK = 3;
optional bool StartedReadingFromEndOffset = 4;
optional uint64 ReadTimestampMs = 5; //optional, default = 0
};

// The consumer's reading of the partition is started (from ReadSession)
Expand Down
7 changes: 6 additions & 1 deletion ydb/services/persqueue_v1/actors/read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2333,8 +2333,13 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReadingFinis
return;
}

auto [_, maxLag, readTimestampMs] = GetReadFrom(it->second.FullConverter, ctx);
if (!readTimestampMs && maxLag) {
readTimestampMs = (TInstant::Now() - TDuration::MilliSeconds(maxLag)).MilliSeconds();
}

auto& topic = it->second;
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(ClientId, msg->PartitionId, AutoPartitioningSupport, msg->FirstMessage));
NTabletPipe::SendData(ctx, topic.PipeClient, new TEvPersQueue::TEvReadingPartitionFinishedRequest(ClientId, msg->PartitionId, AutoPartitioningSupport, msg->FirstMessage, readTimestampMs));

if constexpr (!UseMigrationProtocol) {
if (AutoPartitioningSupport) {
Expand Down
Loading