Skip to content

Commit

Permalink
YQ WM fixed race with actor context (ydb-platform#11576)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Nov 14, 2024
1 parent 071fc9a commit 6dac5e0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
6 changes: 3 additions & 3 deletions ydb/core/kqp/workload_service/kqp_workload_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
(ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem
}), IEventHandle::FlagTrackDelivery);

CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(ActorContext(), Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));
CpuQuotaManager = std::make_unique<TCpuQuotaManagerState>(Counters.Counters->GetSubgroup("subcomponent", "CpuQuotaManager"));

EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
Expand Down Expand Up @@ -556,7 +556,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
return &databaseIt->second;
}
LOG_I("Creating new database state for id " << databaseId);
return &DatabaseToState.insert({databaseId, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
return &DatabaseToState.insert({databaseId, TDatabaseState{.SelfId = SelfId(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second;
}

TPoolState* GetOrCreatePoolState(const TString& databaseId, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) {
Expand All @@ -568,7 +568,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
LOG_I("Creating new handler for pool " << poolKey);

const auto poolHandler = Register(CreatePoolHandlerActor(databaseId, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : MakeIntrusive<NMonitoring::TDynamicCounters>()));
const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second;
const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler}}).first->second;

Counters.ActivePools->Inc();
ScheduleIdleCheck();
Expand Down
37 changes: 17 additions & 20 deletions ydb/core/kqp/workload_service/kqp_workload_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ constexpr TDuration IDLE_DURATION = TDuration::Seconds(60);


struct TDatabaseState {
NActors::TActorContext ActorContext;
TActorId SelfId;
bool& EnabledResourcePoolsOnServerless;

std::vector<TEvPlaceRequestIntoPool::TPtr> PendingRequersts = {};
Expand All @@ -33,7 +33,7 @@ struct TDatabaseState {
const TString& poolId = ev->Get()->PoolId;
auto& subscribers = PendingSubscriptions[poolId];
if (subscribers.empty()) {
ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->DatabaseId, poolId, nullptr));
TActivationContext::Register(CreatePoolFetcherActor(SelfId, ev->Get()->DatabaseId, poolId, nullptr));
}

subscribers.emplace(ev->Sender);
Expand All @@ -45,7 +45,7 @@ struct TDatabaseState {
PendingRequersts.emplace_back(std::move(ev));

if (!EnabledResourcePoolsOnServerless && (TInstant::Now() - LastUpdateTime) > IDLE_DURATION) {
ActorContext.Register(CreateDatabaseFetcherActor(ActorContext.SelfID, DatabaseIdToDatabase(databaseId)));
TActivationContext::Register(CreateDatabaseFetcherActor(SelfId, DatabaseIdToDatabase(databaseId)));
} else if (!DatabaseUnsupported) {
StartPendingRequests();
} else {
Expand All @@ -61,11 +61,11 @@ struct TDatabaseState {
}

if (ev->Get()->Status == Ydb::StatusIds::SUCCESS && poolHandler) {
ActorContext.Send(poolHandler, new TEvPrivate::TEvUpdatePoolSubscription(ev->Get()->PathId, subscribers));
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvUpdatePoolSubscription>(ev->Get()->PathId, subscribers));
} else {
const TString& databaseId = ev->Get()->DatabaseId;
for (const auto& subscriber : subscribers) {
ActorContext.Send(subscriber, new TEvUpdatePoolInfo(databaseId, poolId, std::nullopt, std::nullopt));
TActivationContext::Send(subscriber, std::make_unique<TEvUpdatePoolInfo>(databaseId, poolId, std::nullopt, std::nullopt));
}
}
subscribers.clear();
Expand All @@ -79,7 +79,7 @@ struct TDatabaseState {
}

if (Serverless != ev->Get()->Serverless) {
ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvKqp::TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless));
TActivationContext::Send(MakeKqpProxyID(SelfId.NodeId()), std::make_unique<TEvKqp::TEvUpdateDatabaseInfo>(ev->Get()->Database, ev->Get()->DatabaseId, ev->Get()->Serverless));
}

LastUpdateTime = TInstant::Now();
Expand All @@ -103,25 +103,24 @@ struct TDatabaseState {
}

for (auto& ev : PendingRequersts) {
ActorContext.Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool));
TActivationContext::Register(CreatePoolResolverActor(std::move(ev), HasDefaultPool));
}
PendingRequersts.clear();
}

void ReplyContinueError(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
for (const auto& ev : PendingRequersts) {
RemovePendingSession(ev->Get()->SessionId, [this](TEvCleanupRequest::TPtr event) {
ActorContext.Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, {NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
RemovePendingSession(ev->Get()->SessionId, [actorSystem = TActivationContext::ActorSystem()](TEvCleanupRequest::TPtr event) {
actorSystem->Send(event->Sender, new TEvCleanupResponse(Ydb::StatusIds::NOT_FOUND, NYql::TIssues{NYql::TIssue(TStringBuilder() << "Pool " << event->Get()->PoolId << " not found")}));
});
ActorContext.Send(ev->Sender, new TEvContinueRequest(status, {}, {}, issues));
TActivationContext::Send(ev->Sender, std::make_unique<TEvContinueRequest>(status, TString{}, NResourcePool::TPoolSettings{}, issues));
}
PendingRequersts.clear();
}
};

struct TPoolState {
NActors::TActorId PoolHandler;
NActors::TActorContext ActorContext;

std::queue<TEvPrivate::TEvResolvePoolResponse::TPtr> PendingRequests = {};
bool WaitingInitialization = false;
Expand All @@ -137,7 +136,7 @@ struct TPoolState {
return;
}

ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
TActivationContext::Send(PoolHandler, std::make_unique<TEvPrivate::TEvStopPoolHandler>(false));
PreviousPoolHandlers.insert(PoolHandler);
PoolHandler = *NewPoolHandler;
NewPoolHandler = std::nullopt;
Expand All @@ -151,7 +150,7 @@ struct TPoolState {

PlaceRequestRunning = true;
InFlightRequests++;
ActorContext.Send(PendingRequests.front()->Forward(PoolHandler));
TActivationContext::Send(PendingRequests.front()->Forward(PoolHandler));
PendingRequests.pop();
}

Expand All @@ -163,31 +162,29 @@ struct TPoolState {

void DoCleanupRequest(TEvCleanupRequest::TPtr event) {
for (const auto& poolHandler : PreviousPoolHandlers) {
ActorContext.Send(poolHandler, new TEvCleanupRequest(
TActivationContext::Send(poolHandler, std::make_unique<TEvCleanupRequest>(
event->Get()->DatabaseId, event->Get()->SessionId,
event->Get()->PoolId, event->Get()->Duration, event->Get()->CpuConsumed
));
}
ActorContext.Send(event->Forward(PoolHandler));
TActivationContext::Send(event->Forward(PoolHandler));
}
};

struct TCpuQuotaManagerState {
TCpuQuotaManager CpuQuotaManager;
NActors::TActorContext ActorContext;
bool CpuLoadRequestRunning = false;
TInstant CpuLoadRequestTime = TInstant::Zero();

TCpuQuotaManagerState(NActors::TActorContext actorContext, NMonitoring::TDynamicCounterPtr subComponent)
TCpuQuotaManagerState(NMonitoring::TDynamicCounterPtr subComponent)
: CpuQuotaManager(TDuration::Seconds(1), TDuration::Seconds(10), IDLE_DURATION, 0.1, true, 0, subComponent)
, ActorContext(actorContext)
{}

void RequestCpuQuota(TActorId poolHandler, double maxClusterLoad, ui64 coockie) {
auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad);

bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS;
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvCpuQuotaResponse>(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);

// Schedule notification
if (!quotaAccepted) {
Expand Down Expand Up @@ -238,7 +235,7 @@ struct TCpuQuotaManagerState {
}

for (const TActorId& poolHandler : poolHandlers) {
ActorContext.Send(poolHandler, new TEvPrivate::TEvRefreshPoolState());
TActivationContext::Send(poolHandler, std::make_unique<TEvPrivate::TEvRefreshPoolState>());
HandlersLimits.erase(poolHandler);
}
PendingHandlers.erase(PendingHandlers.begin());
Expand Down

0 comments on commit 6dac5e0

Please sign in to comment.