Skip to content

Commit

Permalink
Fixed sensor -> PendingPartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 21, 2024
1 parent 74e30f2 commit a34e3cd
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ struct TCoordinatorMetrics {
: Counters(counters) {
IncomingRequests = Counters->GetCounter("IncomingRequests", true);
LeaderChangedCount = Counters->GetCounter("LeaderChangedCount");
PendingReadActors = Counters->GetCounter("PendingReadActors");
PendingPartitions = Counters->GetCounter("PendingPartitions");
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;
::NMonitoring::TDynamicCounters::TCounterPtr PendingReadActors;
::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions;
};

class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
Expand Down Expand Up @@ -85,6 +85,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
const TString Tenant;
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
THashSet<TPartitionKey, TPartitionKeyHash> PendingPartitions;
std::unordered_map<TString, std::unordered_map<ui32, ui64>> PartitionsCount; // {TopicName -> {NodeId -> NumberPartitions}}
std::unordered_map<TString, std::unordered_map<TActorId, TCoordinatorRequest>> PendingReadActors; // {TopicName -> {ReadActorId -> CoordinatorRequest}}
TCoordinatorMetrics Metrics;
Expand Down Expand Up @@ -286,6 +287,11 @@ std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartition
PartitionLocations[key] = bestLocation;
rowDispatcherIt->second.Locations.insert(key);
topicPartitionsCount[bestLocation.NodeId()]++;

if (PendingPartitions.erase(key)) {
Metrics.PendingPartitions->Dec();
}

return bestLocation;
}

Expand All @@ -303,11 +309,12 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
Metrics.IncomingRequests->Inc();

TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
if (!ComputeCoordinatorRequest(ev->Sender, request)) {
if (ComputeCoordinatorRequest(ev->Sender, request)) {
PendingReadActors[source.GetTopicPath()].erase(ev->Sender);
} else {
// All nodes are overloaded, add request into pending queue
// We save only last request from each read actor
PendingReadActors[source.GetTopicPath()][ev->Sender] = request;
Metrics.PendingReadActors->Inc();
}
}

Expand All @@ -316,6 +323,7 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC

Y_ENSURE(!RowDispatchers.empty());

bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
for (auto& partitionId : request.Record.GetPartitionId()) {
TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
Expand All @@ -327,12 +335,20 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
if (const auto maybeLocation = GetAndUpdateLocation(key)) {
rowDispatcherId = *maybeLocation;
} else {
return false;
hasPendingPartitions = true;
if (PendingPartitions.insert(key).second) {
Metrics.PendingPartitions->Inc();
}
continue;;
}
}
tmpResult[rowDispatcherId].insert(partitionId);
}

if (hasPendingPartitions) {
return false;
}

auto response = std::make_unique<TEvRowDispatcher::TEvCoordinatorResult>();
for (const auto& [actorId, partitions] : tmpResult) {
auto* partitionsProto = response->Record.AddPartitions();
Expand All @@ -354,7 +370,6 @@ void TActorCoordinator::UpdatePendingReadActors() {
auto& requests = topicIt->second;
for (auto requestIt = requests.begin(); requestIt != requests.end();) {
if (ComputeCoordinatorRequest(requestIt->first, requestIt->second)) {
Metrics.PendingReadActors->Dec();
requestIt = requests.erase(requestIt);
} else {
break;
Expand Down

0 comments on commit a34e3cd

Please sign in to comment.