Skip to content

Commit

Permalink
Added per node partitions limit
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 21, 2024
1 parent 41baadd commit 8d54720
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 24 deletions.
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
bool LocalMode = 3; // Use only local row_dispatcher.

// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
// Request will hang up infinitely, disabled by default
uint64 TopicPartitionsLimitPerNode = 4;
}

message TJsonParserConfig {
Expand Down
112 changes: 88 additions & 24 deletions ydb/core/fq/libs/row_dispatcher/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ struct TCoordinatorMetrics {
: Counters(counters) {
IncomingRequests = Counters->GetCounter("IncomingRequests", true);
LeaderChangedCount = Counters->GetCounter("LeaderChangedCount");
PendingReadActors = Counters->GetCounter("PendingReadActors");
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;

::NMonitoring::TDynamicCounters::TCounterPtr PendingReadActors;
};

class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
Expand Down Expand Up @@ -72,15 +73,21 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
THashSet<TPartitionKey, TPartitionKeyHash> Locations;
};

struct TCoordinatorRequest {
ui64 Cookie;
NRowDispatcherProto::TEvGetAddressRequest Record;
};

NConfig::TRowDispatcherCoordinatorConfig Config;
TYqSharedResources::TPtr YqSharedResources;
TActorId LocalRowDispatcherId;
const TString LogPrefix;
const TString Tenant;
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
std::unordered_map<TString, std::unordered_map<ui32, ui64>> PartitionsCount; // {TopicName -> {NodeId -> NumberPartitions}}
std::unordered_map<TString, std::unordered_map<TActorId, TCoordinatorRequest>> PendingReadActors; // {TopicName -> {ReadActorId -> CoordinatorRequest}}
TCoordinatorMetrics Metrics;
ui64 LocationRandomCounter = 0;
THashSet<TActorId> InterconnectSessions;

public:
Expand Down Expand Up @@ -116,7 +123,9 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {

void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
void PrintInternalState();
NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key);
std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request);
void UpdatePendingReadActors();
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
};

Expand Down Expand Up @@ -199,6 +208,12 @@ void TActorCoordinator::PrintInternalState() {
for (auto& [key, actorId] : PartitionLocations) {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
}

str << "\nPending partitions:\n";
for (const auto& [topicName, requests] : PendingReadActors) {
str << " " << topicName << ", pending read actors: " << requests.size() << "\n";
}

LOG_ROW_DISPATCHER_DEBUG(str.Str());
}

Expand Down Expand Up @@ -237,31 +252,43 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
Metrics.LeaderChangedCount->Inc();
}

NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
Y_ENSURE(!PartitionLocations.contains(key));
auto rand = LocationRandomCounter++ % RowDispatchers.size();

auto it = std::begin(RowDispatchers);
std::advance(it, rand);
auto& topicPartitionsCount = PartitionsCount[key.TopicName];

for (size_t i = 0; i < RowDispatchers.size(); ++i) {
auto& info = it->second;
TActorId bestLocation;
ui64 bestNumberPartitions = std::numeric_limits<ui64>::max();
for (auto& [location, info] : RowDispatchers) {
if (!info.Connected) {
it++;
if (it == std::end(RowDispatchers)) {
it = std::begin(RowDispatchers);
}
continue;
}
PartitionLocations[key] = it->first;
it->second.Locations.insert(key);
return it->first;

ui64 numberPartitions = 0;
if (const auto it = topicPartitionsCount.find(location.NodeId()); it != topicPartitionsCount.end()) {
numberPartitions = it->second;
}

if (!bestLocation || numberPartitions < bestNumberPartitions) {
bestLocation = location;
bestNumberPartitions = numberPartitions;
}
}
Y_ENSURE(bestLocation, "Local row dispatcher should always be connected");

if (Config.GetTopicPartitionsLimitPerNode() > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode()) {
return std::nullopt;
}
Y_ENSURE(false, "Local row dispatcher should always be connected");

PartitionLocations[key] = bestLocation;
RowDispatchers[bestLocation].Locations.insert(key);
topicPartitionsCount[bestLocation.NodeId()]++;
return bestLocation;
}

void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) {
const auto source = ev->Get()->Record.GetSource();
const auto& source = ev->Get()->Record.GetSource();

UpdateInterconnectSessions(ev->InterconnectSession);

TStringStream str;
Expand All @@ -271,18 +298,34 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
Metrics.IncomingRequests->Inc();

TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
if (!ComputeCoordinatorRequest(ev->Sender, request)) {
// All nodes are overloaded, add request into pending queue
// We save only last request from each read actor
PendingReadActors[source.GetTopicPath()][ev->Sender] = request;
Metrics.PendingReadActors->Inc();
}
}

bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request) {
const auto& source = request.Record.GetSource();

Y_ENSURE(!RowDispatchers.empty());

TMap<NActors::TActorId, TSet<ui64>> tmpResult;

for (auto& partitionId : ev->Get()->Record.GetPartitionId()) {
for (auto& partitionId : request.Record.GetPartitionId()) {
TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
auto locationIt = PartitionLocations.find(key);
NActors::TActorId rowDispatcherId;
if (locationIt != PartitionLocations.end()) {
rowDispatcherId = locationIt->second;
} else {
rowDispatcherId = GetAndUpdateLocation(key);
if (const auto maybeLocation = GetAndUpdateLocation(key)) {
rowDispatcherId = *maybeLocation;
} else {
return false;
}
}
tmpResult[rowDispatcherId].insert(partitionId);
}
Expand All @@ -295,12 +338,33 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
partitionsProto->AddPartitionId(partitionId);
}
}
LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender);
Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);

LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId);
Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie);
PrintInternalState();

return true;
}

void TActorCoordinator::UpdatePendingReadActors() {
for (auto topicIt = PendingReadActors.begin(); topicIt != PendingReadActors.end();) {
auto& requests = topicIt->second;
for (auto requestIt = requests.begin(); requestIt != requests.end();) {
if (ComputeCoordinatorRequest(requestIt->first, requestIt->second)) {
Metrics.PendingReadActors->Dec();
requestIt = requests.erase(requestIt);
} else {
break;
}
}

if (requests.empty()) {
topicIt = PendingReadActors.erase(topicIt);
} else {
++topicIt;
}
}
}

} // namespace

Expand Down

0 comments on commit 8d54720

Please sign in to comment.