From a34e3cdfd0df7caaa85eabfdf7c931e2b7ddbdcf Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Thu, 21 Nov 2024 09:22:40 +0000 Subject: [PATCH] Fixed sensor -> PendingPartitions --- .../fq/libs/row_dispatcher/coordinator.cpp | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index 2349dee6aba0..04719a3924d0 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -28,13 +28,13 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChangedCount = Counters->GetCounter("LeaderChangedCount"); - PendingReadActors = Counters->GetCounter("PendingReadActors"); + PendingPartitions = Counters->GetCounter("PendingPartitions"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount; - ::NMonitoring::TDynamicCounters::TCounterPtr PendingReadActors; + ::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions; }; class TActorCoordinator : public TActorBootstrapped { @@ -85,6 +85,7 @@ class TActorCoordinator : public TActorBootstrapped { const TString Tenant; TMap RowDispatchers; THashMap PartitionLocations; + THashSet PendingPartitions; std::unordered_map> PartitionsCount; // {TopicName -> {NodeId -> NumberPartitions}} std::unordered_map> PendingReadActors; // {TopicName -> {ReadActorId -> CoordinatorRequest}} TCoordinatorMetrics Metrics; @@ -286,6 +287,11 @@ std::optional TActorCoordinator::GetAndUpdateLocation(const TPartition PartitionLocations[key] = bestLocation; rowDispatcherIt->second.Locations.insert(key); topicPartitionsCount[bestLocation.NodeId()]++; + + if (PendingPartitions.erase(key)) { + Metrics.PendingPartitions->Dec(); + } + return bestLocation; } @@ -303,11 +309,12 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt Metrics.IncomingRequests->Inc(); TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record}; - if (!ComputeCoordinatorRequest(ev->Sender, request)) { + if (ComputeCoordinatorRequest(ev->Sender, request)) { + PendingReadActors[source.GetTopicPath()].erase(ev->Sender); + } else { // All nodes are overloaded, add request into pending queue // We save only last request from each read actor PendingReadActors[source.GetTopicPath()][ev->Sender] = request; - Metrics.PendingReadActors->Inc(); } } @@ -316,6 +323,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC Y_ENSURE(!RowDispatchers.empty()); + bool hasPendingPartitions = false; TMap> tmpResult; for (auto& partitionId : request.Record.GetPartitionId()) { TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId}; @@ -327,12 +335,20 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC if (const auto maybeLocation = GetAndUpdateLocation(key)) { rowDispatcherId = *maybeLocation; } else { - return false; + hasPendingPartitions = true; + if (PendingPartitions.insert(key).second) { + Metrics.PendingPartitions->Inc(); + } + continue;; } } tmpResult[rowDispatcherId].insert(partitionId); } + if (hasPendingPartitions) { + return false; + } + auto response = std::make_unique(); for (const auto& [actorId, partitions] : tmpResult) { auto* partitionsProto = response->Record.AddPartitions(); @@ -354,7 +370,6 @@ void TActorCoordinator::UpdatePendingReadActors() { auto& requests = topicIt->second; for (auto requestIt = requests.begin(); requestIt != requests.end();) { if (ComputeCoordinatorRequest(requestIt->first, requestIt->second)) { - Metrics.PendingReadActors->Dec(); requestIt = requests.erase(requestIt); } else { break;