From 8d54720519a3414f357b2bcdbd7990f9f0fe0917 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Thu, 21 Nov 2024 07:49:55 +0000 Subject: [PATCH] Added per node partitions limit --- .../libs/config/protos/row_dispatcher.proto | 5 + .../fq/libs/row_dispatcher/coordinator.cpp | 112 ++++++++++++++---- 2 files changed, 93 insertions(+), 24 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index e4f5d180cb7a..2ee16c93ddef 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -12,6 +12,11 @@ message TRowDispatcherCoordinatorConfig { TYdbStorageConfig Database = 1; string CoordinationNodePath = 2; bool LocalMode = 3; // Use only local row_dispatcher. + + // Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode + // if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions) + // Request will hang up infinitely, disabled by default + uint64 TopicPartitionsLimitPerNode = 4; } message TJsonParserConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp index dd5b4dca5dea..8b818156ecbe 100644 --- a/ydb/core/fq/libs/row_dispatcher/coordinator.cpp +++ b/ydb/core/fq/libs/row_dispatcher/coordinator.cpp @@ -28,12 +28,13 @@ struct TCoordinatorMetrics { : Counters(counters) { IncomingRequests = Counters->GetCounter("IncomingRequests", true); LeaderChangedCount = Counters->GetCounter("LeaderChangedCount"); + PendingReadActors = Counters->GetCounter("PendingReadActors"); } ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests; ::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount; - + ::NMonitoring::TDynamicCounters::TCounterPtr PendingReadActors; }; class TActorCoordinator : public TActorBootstrapped { @@ -72,6 +73,11 @@ class TActorCoordinator : public TActorBootstrapped { THashSet Locations; }; + struct TCoordinatorRequest { + ui64 Cookie; + NRowDispatcherProto::TEvGetAddressRequest Record; + }; + NConfig::TRowDispatcherCoordinatorConfig Config; TYqSharedResources::TPtr YqSharedResources; TActorId LocalRowDispatcherId; @@ -79,8 +85,9 @@ class TActorCoordinator : public TActorBootstrapped { const TString Tenant; TMap RowDispatchers; THashMap PartitionLocations; + std::unordered_map> PartitionsCount; // {TopicName -> {NodeId -> NumberPartitions}} + std::unordered_map> PendingReadActors; // {TopicName -> {ReadActorId -> CoordinatorRequest}} TCoordinatorMetrics Metrics; - ui64 LocationRandomCounter = 0; THashSet InterconnectSessions; public: @@ -116,7 +123,9 @@ class TActorCoordinator : public TActorBootstrapped { void AddRowDispatcher(NActors::TActorId actorId, bool isLocal); void PrintInternalState(); - NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key); + std::optional GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached + bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request); + void UpdatePendingReadActors(); void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession); }; @@ -199,6 +208,12 @@ void TActorCoordinator::PrintInternalState() { for (auto& [key, actorId] : PartitionLocations) { str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n"; } + + str << "\nPending partitions:\n"; + for (const auto& [topicName, requests] : PendingReadActors) { + str << " " << topicName << ", pending read actors: " << requests.size() << "\n"; + } + LOG_ROW_DISPATCHER_DEBUG(str.Str()); } @@ -237,31 +252,43 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt Metrics.LeaderChangedCount->Inc(); } -NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) { +std::optional TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) { Y_ENSURE(!PartitionLocations.contains(key)); - auto rand = LocationRandomCounter++ % RowDispatchers.size(); - auto it = std::begin(RowDispatchers); - std::advance(it, rand); + auto& topicPartitionsCount = PartitionsCount[key.TopicName]; - for (size_t i = 0; i < RowDispatchers.size(); ++i) { - auto& info = it->second; + TActorId bestLocation; + ui64 bestNumberPartitions = std::numeric_limits::max(); + for (auto& [location, info] : RowDispatchers) { if (!info.Connected) { - it++; - if (it == std::end(RowDispatchers)) { - it = std::begin(RowDispatchers); - } continue; } - PartitionLocations[key] = it->first; - it->second.Locations.insert(key); - return it->first; + + ui64 numberPartitions = 0; + if (const auto it = topicPartitionsCount.find(location.NodeId()); it != topicPartitionsCount.end()) { + numberPartitions = it->second; + } + + if (!bestLocation || numberPartitions < bestNumberPartitions) { + bestLocation = location; + bestNumberPartitions = numberPartitions; + } + } + Y_ENSURE(bestLocation, "Local row dispatcher should always be connected"); + + if (Config.GetTopicPartitionsLimitPerNode() > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode()) { + return std::nullopt; } - Y_ENSURE(false, "Local row dispatcher should always be connected"); + + PartitionLocations[key] = bestLocation; + RowDispatchers[bestLocation].Locations.insert(key); + topicPartitionsCount[bestLocation.NodeId()]++; + return bestLocation; } void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) { - const auto source = ev->Get()->Record.GetSource(); + const auto& source = ev->Get()->Record.GetSource(); + UpdateInterconnectSessions(ev->InterconnectSession); TStringStream str; @@ -271,18 +298,34 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt } LOG_ROW_DISPATCHER_DEBUG(str.Str()); Metrics.IncomingRequests->Inc(); + + TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record}; + if (!ComputeCoordinatorRequest(ev->Sender, request)) { + // All nodes are overloaded, add request into pending queue + // We save only last request from each read actor + PendingReadActors[source.GetTopicPath()][ev->Sender] = request; + Metrics.PendingReadActors->Inc(); + } +} + +bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request) { + const auto& source = request.Record.GetSource(); + Y_ENSURE(!RowDispatchers.empty()); TMap> tmpResult; - - for (auto& partitionId : ev->Get()->Record.GetPartitionId()) { + for (auto& partitionId : request.Record.GetPartitionId()) { TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId}; auto locationIt = PartitionLocations.find(key); NActors::TActorId rowDispatcherId; if (locationIt != PartitionLocations.end()) { rowDispatcherId = locationIt->second; } else { - rowDispatcherId = GetAndUpdateLocation(key); + if (const auto maybeLocation = GetAndUpdateLocation(key)) { + rowDispatcherId = *maybeLocation; + } else { + return false; + } } tmpResult[rowDispatcherId].insert(partitionId); } @@ -295,12 +338,33 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt partitionsProto->AddPartitionId(partitionId); } } - - LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender); - Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie); + + LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId); + Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie); PrintInternalState(); + + return true; } +void TActorCoordinator::UpdatePendingReadActors() { + for (auto topicIt = PendingReadActors.begin(); topicIt != PendingReadActors.end();) { + auto& requests = topicIt->second; + for (auto requestIt = requests.begin(); requestIt != requests.end();) { + if (ComputeCoordinatorRequest(requestIt->first, requestIt->second)) { + Metrics.PendingReadActors->Dec(); + requestIt = requests.erase(requestIt); + } else { + break; + } + } + + if (requests.empty()) { + topicIt = PendingReadActors.erase(topicIt); + } else { + ++topicIt; + } + } +} } // namespace