Skip to content

Commit

Permalink
YQ-3371 fixed WM useg without feature flag (ydb-platform#6203)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 3, 2024
1 parent 00612bd commit 895e035
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
5 changes: 5 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return ProgressStatsPeriod;
}

void SetPoolId(const TString& poolId) {
PoolId = poolId;
Record.MutableRequest()->SetPoolId(PoolId);
}

TString GetPoolId() const {
if (PoolId) {
return PoolId;
Expand Down
18 changes: 16 additions & 2 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/workload_service/kqp_workload_service.h>
#include <ydb/core/resource_pools/resource_pool_settings.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/library/yql/dq/actors/spilling/spilling_file.h>
#include <ydb/library/yql/dq/actors/spilling/spilling.h>
Expand Down Expand Up @@ -187,6 +188,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
): LogConfig(logConfig)
, TableServiceConfig(tableServiceConfig)
, QueryServiceConfig(queryServiceConfig)
, FeatureFlags()
, KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings)))
, FederatedQuerySetupFactory(federatedQuerySetupFactory)
, QueryReplayFactory(std::move(queryReplayFactory))
Expand All @@ -199,6 +201,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
void Bootstrap(const TActorContext &ctx) {
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(KQP_PROVIDER));
Counters = MakeIntrusive<TKqpCounters>(AppData()->Counters, &TlsActivationContext->AsActorContext());
FeatureFlags = AppData()->FeatureFlags;
// NOTE: some important actors are constructed within next call
FederatedQuerySetup = FederatedQuerySetupFactory->Make(ctx.ActorSystem());
AsyncIoFactory = CreateKqpAsyncIoFactory(Counters, FederatedQuerySetup, S3ActorsFactory);
Expand All @@ -219,12 +222,13 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {

UpdateYqlLogLevels();

// Subscribe for TableService & Logger config changes
// Subscribe for TableService & Logger & FeatureFlags config changes
ui32 tableServiceConfigKind = (ui32)NKikimrConsole::TConfigItem::TableServiceConfigItem;
ui32 logConfigKind = (ui32)NKikimrConsole::TConfigItem::LogConfigItem;
ui32 featureFlagsKind = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()),
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(
{tableServiceConfigKind, logConfigKind}),
{tableServiceConfigKind, logConfigKind, featureFlagsKind}),
IEventHandle::FlagTrackDelivery);

WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
Expand Down Expand Up @@ -482,6 +486,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
LogConfig.Swap(event.MutableConfig()->MutableLogConfig());
UpdateYqlLogLevels();

FeatureFlags.Swap(event.MutableConfig()->MutableFeatureFlags());

auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
PublishResourceUsage();
Expand Down Expand Up @@ -681,6 +687,13 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
LocalSessions->AttachQueryText(sessionInfo, ev->Get()->GetQuery());
}

if (!FeatureFlags.GetEnableResourcePools()) {
ev->Get()->SetPoolId("");
} else if (!ev->Get()->GetPoolId()) {
// TODO: do not use default pool if there is no limits
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
}

TActorId targetId;
if (sessionInfo) {
targetId = sessionInfo->WorkerId;
Expand Down Expand Up @@ -1742,6 +1755,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
NKikimrConfig::TLogConfig LogConfig;
NKikimrConfig::TTableServiceConfig TableServiceConfig;
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
NKikimrConfig::TFeatureFlags FeatureFlags;
TKqpSettings::TConstPtr KqpSettings;
IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory;
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

QueryState->UpdateTempTablesState(TempTablesState);

PassRequestToResourcePool();
if (QueryState->UserRequestContext->PoolId) {
PassRequestToResourcePool();
} else {
CompileQuery();
}
}

void Handle(TEvents::TEvUndelivered::TPtr& ev) {
Expand Down

0 comments on commit 895e035

Please sign in to comment.