Skip to content

Commit

Permalink
YQ-3890 added per node partition limit (ydb-platform#11830)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 21, 2024
1 parent edbaa9d commit ee934ab
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 26 deletions.
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
bool LocalMode = 3; // Use only local row_dispatcher.

// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
// Request will hang up infinitely, disabled by default
uint64 TopicPartitionsLimitPerNode = 4;
}

message TJsonParserConfig {
Expand Down
191 changes: 165 additions & 26 deletions ydb/core/fq/libs/row_dispatcher/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ struct TCoordinatorMetrics {
: Counters(counters) {
IncomingRequests = Counters->GetCounter("IncomingRequests", true);
LeaderChangedCount = Counters->GetCounter("LeaderChangedCount");
PartitionsLimitPerNode = Counters->GetCounter("PartitionsLimitPerNode");
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests;
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;

::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode;
};

class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
Expand Down Expand Up @@ -72,15 +73,79 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
THashSet<TPartitionKey, TPartitionKeyHash> Locations;
};

struct TCoordinatorRequest {
ui64 Cookie;
NRowDispatcherProto::TEvGetAddressRequest Record;
};

struct TTopicInfo {
struct TTopicMetrics {
TTopicMetrics(const TCoordinatorMetrics& metrics, const TString& topicNmae)
: Counters(metrics.Counters->GetSubgroup("topic", topicNmae))
{
PendingPartitions = Counters->GetCounter("PendingPartitions");
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions;
};

struct TNodeMetrics {
TNodeMetrics(const TTopicMetrics& metrics, ui32 nodeId)
: Counters(metrics.Counters->GetSubgroup("node", ToString(nodeId)))
{
PartitionsCount = Counters->GetCounter("PartitionsCount");
}

::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounters::TCounterPtr PartitionsCount;
};

struct TNodeInfo {
ui64 NumberPartitions = 0;
TNodeMetrics Metrics;
};

TTopicInfo(const TCoordinatorMetrics& metrics, const TString& topicName)
: Metrics(metrics, topicName)
{}

void AddPendingPartition(const TPartitionKey& key) {
if (PendingPartitions.insert(key).second) {
Metrics.PendingPartitions->Inc();
}
}

void RemovePendingPartition(const TPartitionKey& key) {
if (PendingPartitions.erase(key)) {
Metrics.PendingPartitions->Dec();
}
}

void IncNodeUsage(ui32 nodeId) {
auto nodeIt = NodesInfo.find(nodeId);
if (nodeIt == NodesInfo.end()) {
nodeIt = NodesInfo.insert({nodeId, TNodeInfo{.NumberPartitions = 0, .Metrics = TNodeMetrics(Metrics, nodeId)}}).first;
}
nodeIt->second.NumberPartitions++;
nodeIt->second.Metrics.PartitionsCount->Inc();
}

THashSet<TPartitionKey, TPartitionKeyHash> PendingPartitions;
THashMap<ui32, TNodeInfo> NodesInfo;
TTopicMetrics Metrics;
};

NConfig::TRowDispatcherCoordinatorConfig Config;
TYqSharedResources::TPtr YqSharedResources;
TActorId LocalRowDispatcherId;
const TString LogPrefix;
const TString Tenant;
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
THashMap<TString, TTopicInfo> TopicsInfo;
std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors;
TCoordinatorMetrics Metrics;
ui64 LocationRandomCounter = 0;
THashSet<TActorId> InterconnectSessions;

public:
Expand Down Expand Up @@ -116,7 +181,10 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {

void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
void PrintInternalState();
NActors::TActorId GetAndUpdateLocation(const TPartitionKey& key);
TTopicInfo& GetOrCreateTopicInfo(const TString& topicName);
std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request);
void UpdatePendingReadActors();
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
};

Expand All @@ -131,7 +199,9 @@ TActorCoordinator::TActorCoordinator(
, LocalRowDispatcherId(localRowDispatcherId)
, LogPrefix("Coordinator: ")
, Tenant(tenant)
, Metrics(counters) {
, Metrics(counters)
{
Metrics.PartitionsLimitPerNode->Set(Config.GetTopicPartitionsLimitPerNode());
AddRowDispatcher(localRowDispatcherId, true);
}

Expand All @@ -145,6 +215,7 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
auto it = RowDispatchers.find(actorId);
if (it != RowDispatchers.end()) {
it->second.Connected = true;
UpdatePendingReadActors();
return;
}

Expand All @@ -161,10 +232,12 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
auto node = RowDispatchers.extract(oldActorId);
node.key() = actorId;
RowDispatchers.insert(std::move(node));
UpdatePendingReadActors();
return;
}

RowDispatchers.emplace(actorId, RowDispatcherInfo{true, isLocal});
UpdatePendingReadActors();
}

void TActorCoordinator::UpdateInterconnectSessions(const NActors::TActorId& interconnectSession) {
Expand Down Expand Up @@ -197,8 +270,14 @@ void TActorCoordinator::PrintInternalState() {

str << "\nLocations:\n";
for (auto& [key, actorId] : PartitionLocations) {
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
}

str << "\nPending partitions:\n";
for (const auto& [topicName, topicInfo] : TopicsInfo) {
str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
}

LOG_ROW_DISPATCHER_DEBUG(str.Str());
}

Expand Down Expand Up @@ -237,31 +316,57 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
Metrics.LeaderChangedCount->Inc();
}

NActors::TActorId TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) {
const auto it = TopicsInfo.find(topicName);
if (it != TopicsInfo.end()) {
return it->second;
}
return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second;
}

std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
Y_ENSURE(!PartitionLocations.contains(key));
auto rand = LocationRandomCounter++ % RowDispatchers.size();

auto it = std::begin(RowDispatchers);
std::advance(it, rand);
auto& topicInfo = GetOrCreateTopicInfo(key.TopicName);

for (size_t i = 0; i < RowDispatchers.size(); ++i) {
auto& info = it->second;
TActorId bestLocation;
ui64 bestNumberPartitions = std::numeric_limits<ui64>::max();
for (auto& [location, info] : RowDispatchers) {
if (!info.Connected) {
it++;
if (it == std::end(RowDispatchers)) {
it = std::begin(RowDispatchers);
}
continue;
}
PartitionLocations[key] = it->first;
it->second.Locations.insert(key);
return it->first;

ui64 numberPartitions = 0;
if (const auto it = topicInfo.NodesInfo.find(location.NodeId()); it != topicInfo.NodesInfo.end()) {
numberPartitions = it->second.NumberPartitions;
}

if (!bestLocation || numberPartitions < bestNumberPartitions) {
bestLocation = location;
bestNumberPartitions = numberPartitions;
}
}
Y_ENSURE(bestLocation, "Local row dispatcher should always be connected");

if (Config.GetTopicPartitionsLimitPerNode() > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode()) {
topicInfo.AddPendingPartition(key);
return std::nullopt;
}
Y_ENSURE(false, "Local row dispatcher should always be connected");

auto rowDispatcherIt = RowDispatchers.find(bestLocation);
Y_ENSURE(rowDispatcherIt != RowDispatchers.end(), "Invalid best location");

PartitionLocations[key] = bestLocation;
rowDispatcherIt->second.Locations.insert(key);
topicInfo.IncNodeUsage(bestLocation.NodeId());
topicInfo.RemovePendingPartition(key);

return bestLocation;
}

void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) {
const auto source = ev->Get()->Record.GetSource();
const auto& source = ev->Get()->Record.GetSource();

UpdateInterconnectSessions(ev->InterconnectSession);

TStringStream str;
Expand All @@ -271,22 +376,45 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
Metrics.IncomingRequests->Inc();

TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
if (ComputeCoordinatorRequest(ev->Sender, request)) {
PendingReadActors.erase(ev->Sender);
} else {
// All nodes are overloaded, add request into pending queue
// We save only last request from each read actor
PendingReadActors[ev->Sender] = request;
}
}

bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request) {
const auto& source = request.Record.GetSource();

Y_ENSURE(!RowDispatchers.empty());

bool hasPendingPartitions = false;
TMap<NActors::TActorId, TSet<ui64>> tmpResult;

for (auto& partitionId : ev->Get()->Record.GetPartitionId()) {
for (auto& partitionId : request.Record.GetPartitionId()) {
TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
auto locationIt = PartitionLocations.find(key);
NActors::TActorId rowDispatcherId;
if (locationIt != PartitionLocations.end()) {
rowDispatcherId = locationIt->second;
} else {
rowDispatcherId = GetAndUpdateLocation(key);
if (const auto maybeLocation = GetAndUpdateLocation(key)) {
rowDispatcherId = *maybeLocation;
} else {
hasPendingPartitions = true;
continue;
}
}
tmpResult[rowDispatcherId].insert(partitionId);
}

if (hasPendingPartitions) {
return false;
}

auto response = std::make_unique<TEvRowDispatcher::TEvCoordinatorResult>();
for (const auto& [actorId, partitions] : tmpResult) {
auto* partitionsProto = response->Record.AddPartitions();
Expand All @@ -295,12 +423,23 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
partitionsProto->AddPartitionId(partitionId);
}
}
LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << ev->Sender);
Send(ev->Sender, response.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);

LOG_ROW_DISPATCHER_DEBUG("Send TEvCoordinatorResult to " << readActorId);
Send(readActorId, response.release(), IEventHandle::FlagTrackDelivery, request.Cookie);
PrintInternalState();

return true;
}

void TActorCoordinator::UpdatePendingReadActors() {
for (auto readActorIt = PendingReadActors.begin(); readActorIt != PendingReadActors.end();) {
if (ComputeCoordinatorRequest(readActorIt->first, readActorIt->second)) {
readActorIt = PendingReadActors.erase(readActorIt);
} else {
++readActorIt;
}
}
}

} // namespace

Expand Down

0 comments on commit ee934ab

Please sign in to comment.