From b25234687860c21fd17c2d82e0f2b4ea4d204c84 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 19 Dec 2023 08:43:17 +0000 Subject: [PATCH] Yt supported --- ydb/apps/ydbd/ya.make | 1 + ydb/core/config/ut/main.cpp | 22 ++++++ ydb/core/driver_lib/run/ya.make | 2 + .../kqp/compile_service/kqp_compile_actor.cpp | 2 +- .../kqp/compute_actor/kqp_compute_actor.cpp | 8 +- ydb/core/kqp/compute_actor/ya.make | 1 + .../kqp_federated_query_helpers.cpp | 19 ++++- .../kqp_federated_query_helpers.h | 18 ++++- ydb/core/kqp/federated_query/ya.make | 2 + ydb/core/kqp/host/kqp_host.cpp | 37 ++++++++- ydb/core/kqp/host/kqp_host.h | 2 +- ydb/core/kqp/host/kqp_translate.cpp | 1 + ydb/core/kqp/node_service/kqp_node_ut.cpp | 2 +- ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 75 ++++++++++++++++++ ydb/core/kqp/opt/physical/kqp_opt_phy.cpp | 7 ++ .../kqp/provider/yql_kikimr_opt_build.cpp | 3 +- .../kqp/query_compiler/kqp_mkql_compiler.cpp | 6 +- .../kqp/session_actor/kqp_session_actor.cpp | 14 ++++ .../kqp/session_actor/kqp_worker_actor.cpp | 2 +- .../kqp/ut/federated_query/common/common.cpp | 4 +- ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 2 +- ydb/core/kqp/ut/opt/kqp_kv_ut.cpp | 1 - ydb/core/protos/config.proto | 3 + ydb/core/protos/ya.make | 1 + ydb/core/testlib/test_client.cpp | 4 +- ydb/core/testlib/ya.make | 2 + .../core/services/yql_transform_pipeline.cpp | 12 +-- .../core/services/yql_transform_pipeline.h | 4 +- .../yt/provider/yql_yt_datasink_finalize.cpp | 5 ++ .../yt/provider/yql_yt_datasource.cpp | 33 ++++++-- .../yt/provider/yql_yt_physical_optimize.cpp | 34 ++++++-- .../providers/yt/provider/yql_yt_provider.cpp | 77 +++++++++++-------- .../providers/yt/provider/yql_yt_provider.h | 3 + .../yql/sql/settings/translation_settings.h | 1 + ydb/library/yql/sql/v0/query.cpp | 11 +-- ydb/library/yql/sql/v1/query.cpp | 11 +-- .../kqprun/configuration/app_config.conf | 14 ++++ 37 files changed, 358 insertions(+), 88 deletions(-) diff --git a/ydb/apps/ydbd/ya.make b/ydb/apps/ydbd/ya.make index 90642fe153c2..d894acdb873d 100644 --- a/ydb/apps/ydbd/ya.make +++ b/ydb/apps/ydbd/ya.make @@ -83,6 +83,7 @@ CHECK_DEPENDENT_DIRS( tools/rorescompiler util ydb + yt ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/config/ut/main.cpp b/ydb/core/config/ut/main.cpp index 9edcba3f7665..c29ffde95d11 100644 --- a/ydb/core/config/ut/main.cpp +++ b/ydb/core/config/ut/main.cpp @@ -221,6 +221,17 @@ Y_UNIT_TEST_SUITE(ConfigProto) { "/AppConfig/FederatedQueryConfig/Gateways/Ydb/DefaultSettings/Name/Name", "/AppConfig/FederatedQueryConfig/Gateways/Pq/ClusterMapping/Settings/Name/Name", "/AppConfig/FederatedQueryConfig/Gateways/Pq/DefaultSettings/Name/Name", + "/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Hour/Hour", + "/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Percentage/Percentage", + "/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Name/Name", + "/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Value/Value", + "/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Hour/Hour", + "/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Percentage/Percentage", + "/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Name/Name", + "/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Value/Value", + "/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Pattern/Pattern", + "/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Cluster/Cluster", + "/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Path/Path", "/AppConfig/QueryServiceConfig/Generic/DefaultSettings/Value/Value", "/AppConfig/FederatedQueryConfig/Gateways/S3/ClusterMapping/Settings/Value/Value", "/AppConfig/QueryServiceConfig/S3/ClusterMapping/Settings/Value/Value", @@ -298,6 +309,17 @@ Y_UNIT_TEST_SUITE(ConfigProto) { {58, 9, 2, 102, 1, 1}, // /AppConfig/FederatedQueryConfig/Gateways/Dq/DefaultSettings/Name/Name {73, 6, 100, 2, 2}, // /AppConfig/QueryServiceConfig/S3/DefaultSettings/Value/Value {73, 11, 6, 2, 2}, // /AppConfig/QueryServiceConfig/Generic/DefaultSettings/Value/Value + {73, 15, 101, 100, 3, 2, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Hour/Hour + {73, 15, 101, 100, 3, 2, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Percentage/Percentage + {73, 15, 101, 100, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Name/Name + {73, 15, 101, 100, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Value/Value + {73, 15, 102, 3, 2, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Hour/Hour + {73, 15, 102, 3, 2, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Percentage/Percentage + {73, 15, 102, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Name/Name + {73, 15, 102, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Value/Value + {73, 15, 100, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Pattern/Pattern + {73, 15, 100, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Cluster/Cluster + {73, 15, 100, 3, 3}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Path/Path {58, 9, 6, 1, 100, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/Solomon/ClusterMapping/Settings/Value/Value {58, 9, 5, 1, 100, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/S3/ClusterMapping/Settings/Value/Value {58, 9, 6, 2, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/Solomon/DefaultSettings/Value/Value diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 62714d224329..b808fcbc9b57 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -135,6 +135,8 @@ PEERDIR( ydb/library/pdisk_io ydb/library/security ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 ydb/library/yql/providers/pq/cm_client ydb/library/yql/public/udf/service/exception_policy ydb/public/lib/base diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index baaedd0924e1..3050f99e6de2 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -181,7 +181,7 @@ class TKqpCompileActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState)); + FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState), nullptr, UserRequestContext->SessionId); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 3e9f324caa6e..bbf867904883 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace NKikimr { @@ -25,13 +26,18 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput MKQL_ENSURE_S(computeCtx); auto computeFactory = GetKqpBaseComputeFactory(computeCtx); + auto ytComputeFactory = NYql::GetDqYtFactory(); - return [computeFactory, computeCtx] + return [computeFactory, ytComputeFactory, computeCtx] (TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (auto compute = computeFactory(callable, ctx)) { return compute; } + if (auto ytCompute = ytComputeFactory(callable, ctx)) { + return ytCompute; + } + auto name = callable.GetType()->GetName(); if (name == "KqpWideReadTable"sv) { diff --git a/ydb/core/kqp/compute_actor/ya.make b/ydb/core/kqp/compute_actor/ya.make index 570d174d7394..cc5813739683 100644 --- a/ydb/core/kqp/compute_actor/ya.make +++ b/ydb/core/kqp/compute_actor/ya.make @@ -24,6 +24,7 @@ PEERDIR( ydb/library/yql/dq/actors/compute ydb/library/yql/providers/generic/actors ydb/library/yql/providers/s3/actors + ydb/library/yql/providers/yt/comp_nodes/dq ydb/library/yql/public/issue ) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 6104e99ef9d0..04e91aa190f1 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -11,10 +11,20 @@ #include #include +#include +#include + #include #include namespace NKikimr::NKqp { + NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) { + NYql::TYtNativeServices ytServices; + ytServices.FunctionRegistry = functionRegistry; + ytServices.FileStorage = WithAsync(CreateFileStorage(queryServiceConfig.GetFileStorage(), {MakeYtDownloader(queryServiceConfig.GetFileStorage())})); + ytServices.Config = std::make_shared(queryServiceConfig.GetYt()); + return CreateYtNativeGateway(ytServices); + } NYql::THttpGatewayConfig DefaultHttpGatewayConfig() { NYql::THttpGatewayConfig config; @@ -53,6 +63,9 @@ namespace NKikimr::NKqp { S3GatewayConfig = queryServiceConfig.GetS3(); + YtGatewayConfig = queryServiceConfig.GetYt(); + YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig); + // Initialize Token Accessor if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) { const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig(); @@ -102,7 +115,9 @@ namespace NKikimr::NKqp { CredentialsFactory, nullptr, S3GatewayConfig, - GenericGatewaysConfig}; + GenericGatewaysConfig, + YtGatewayConfig, + YtGateway}; // Init DatabaseAsyncResolver only if all requirements are met if (DatabaseResolverActorId && GenericGatewaysConfig.HasMdbGateway() && MdbEndpointGenerator) { @@ -128,4 +143,4 @@ namespace NKikimr::NKqp { return std::make_shared(setup, appData, appConfig); } -} +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h index 2db1e5a9f725..489da2b4a1be 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h @@ -8,8 +8,10 @@ #include #include #include +#include namespace NKikimr::NKqp { + NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig); struct TKqpFederatedQuerySetup { NYql::IHTTPGateway::TPtr HttpGateway; @@ -18,6 +20,8 @@ namespace NKikimr::NKqp { NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; NYql::TS3GatewayConfig S3GatewayConfig; NYql::TGenericGatewayConfig GenericGatewayConfig; + NYql::TYtGatewayConfig YtGatewayConfig; + NYql::IYtGateway::TPtr YtGateway; }; struct IKqpFederatedQuerySetupFactory { @@ -47,6 +51,8 @@ namespace NKikimr::NKqp { NYql::IHTTPGateway::TPtr HttpGateway; NYql::TS3GatewayConfig S3GatewayConfig; NYql::TGenericGatewayConfig GenericGatewaysConfig; + NYql::TYtGatewayConfig YtGatewayConfig; + NYql::IYtGateway::TPtr YtGateway; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; NYql::NConnector::IClient::TPtr ConnectorClient; std::optional DatabaseResolverActorId; @@ -62,19 +68,23 @@ namespace NKikimr::NKqp { NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver, const NYql::TS3GatewayConfig& s3GatewayConfig, - const NYql::TGenericGatewayConfig& genericGatewayConfig) + const NYql::TGenericGatewayConfig& genericGatewayConfig, + const NYql::TYtGatewayConfig& ytGatewayConfig, + NYql::IYtGateway::TPtr ytGateway) : HttpGateway(httpGateway) , ConnectorClient(connectorClient) , CredentialsFactory(credentialsFactory) , DatabaseAsyncResolver(databaseAsyncResolver) , S3GatewayConfig(s3GatewayConfig) , GenericGatewayConfig(genericGatewayConfig) + , YtGatewayConfig(ytGatewayConfig) + , YtGateway(ytGateway) { } std::optional Make(NActors::TActorSystem*) override { return TKqpFederatedQuerySetup{ - HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig}; + HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway}; } private: @@ -84,10 +94,12 @@ namespace NKikimr::NKqp { NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; NYql::TS3GatewayConfig S3GatewayConfig; NYql::TGenericGatewayConfig GenericGatewayConfig; + NYql::TYtGatewayConfig YtGatewayConfig; + NYql::IYtGateway::TPtr YtGateway; }; IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory( NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData, const NKikimrConfig::TAppConfig& config); -} +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/federated_query/ya.make b/ydb/core/kqp/federated_query/ya.make index 41397c2ee5f3..d2b9bff59074 100644 --- a/ydb/core/kqp/federated_query/ya.make +++ b/ydb/core/kqp/federated_query/ya.make @@ -12,6 +12,8 @@ PEERDIR( ydb/library/db_pool/protos ydb/library/yql/providers/common/http_gateway ydb/library/yql/providers/generic/connector/libcpp + ydb/library/yql/providers/yt/gateway/native + ydb/library/yql/providers/yt/lib/yt_download ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 7a46c7157b6a..e42494ccad9e 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -958,9 +959,10 @@ class TKqpHost : public IKqpHost { std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, - NActors::TActorSystem* actorSystem = nullptr) + NActors::TActorSystem* actorSystem = nullptr, TString sessionId = "") : Gateway(gateway) , Cluster(cluster) + , SessionId(sessionId) , ExprCtx(new TExprContext()) , ModuleResolver(moduleResolver) , KeepConfigChanges(keepConfigChanges) @@ -1536,6 +1538,29 @@ class TKqpHost : public IKqpHost { TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state)); } + void InitYtProvider() { + Y_ENSURE(SessionId); + + TString userName = CreateGuidAsString(); + if (SessionCtx->GetUserToken() && SessionCtx->GetUserToken()->GetUserSID()) { + userName = SessionCtx->GetUserToken()->GetUserSID(); + } + + auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, SessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx); + + ytState->PassiveExecution = true; + ytState->Gateway->OpenSession( + IYtGateway::TOpenSessionOptions(SessionId) + .UserName(userName) + .RandomProvider(TAppData::RandomProvider) + .TimeProvider(TAppData::TimeProvider) + .StatWriter(statWriter) + ); + + TypesCtx->AddDataSource(YtProviderName, CreateYtDataSource(ytState)); + TypesCtx->AddDataSink(YtProviderName, CreateYtDataSink(ytState)); + } + void InitPgProvider() { auto state = MakeIntrusive(); state->Types = TypesCtx.Get(); @@ -1577,6 +1602,9 @@ class TKqpHost : public IKqpHost { if (addExternalDataSources && FederatedQuerySetup) { InitS3Provider(queryType); InitGenericProvider(); + if (FederatedQuerySetup->YtGatewayConfig.ClusterMappingSize()) { + InitYtProvider(); + } } InitPgProvider(); @@ -1619,7 +1647,7 @@ class TKqpHost : public IKqpHost { .AddPreTypeAnnotation() .AddExpressionEvaluation(*FuncRegistry) .Add(new TFailExpressionEvaluation(), "FailExpressionEvaluation") - .AddIOAnnotation() + .AddIOAnnotation(false) .AddTypeAnnotation() .Add(TCollectParametersTransformer::Sync(SessionCtx->QueryPtr()), "CollectParameters") .AddPostTypeAnnotation() @@ -1674,6 +1702,7 @@ class TKqpHost : public IKqpHost { private: TIntrusivePtr Gateway; TString Cluster; + TString SessionId; THolder ExprCtx; IModuleResolver::TPtr ModuleResolver; bool KeepConfigChanges; @@ -1722,10 +1751,10 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, - TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem) + TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, TString sessionId) { return MakeIntrusive(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry, - keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem); + keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, sessionId); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 3b8022221dff..44336908ed30 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -111,7 +111,7 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, - NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/); + NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/, TString sessionId = ""); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_translate.cpp b/ydb/core/kqp/host/kqp_translate.cpp index 48eaa8f5b7d8..22bac53c0393 100644 --- a/ydb/core/kqp/host/kqp_translate.cpp +++ b/ydb/core/kqp/host/kqp_translate.cpp @@ -87,6 +87,7 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT if (isEnableExternalDataSources) { settings.DynamicClusterProvider = NYql::KikimrProviderName; settings.BindingsMode = bindingsMode; + settings.SaveWorldDependencies = true; } settings.InferSyntaxVersion = true; diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index fdcca8f068c4..d6a04dd3e1c9 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -184,7 +184,7 @@ class KqpNode : public TTestBase { Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto FederatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}}); + auto FederatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr}); auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup); auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory); KqpNodeActorId = Runtime->Register(kqpNode); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 9fea92a6d844..0e86edffc087 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -54,16 +54,27 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable)); AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend)); AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend)); + AddHandler(0, &TCoUnorderedBase::Match, HNDL(UnorderedOverDqReadWrap)); + AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqReadWrap)); + AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqReadWrap)); + AddHandler(0, &TCoExtendBase::Match, HNDL(ExtendOverDqReadWrap)); + AddHandler(0, &TCoNarrowMap::Match, HNDL(DqReadWideWrapFieldSubset)); + AddHandler(0, &TCoNarrowFlatMap::Match, HNDL(DqReadWideWrapFieldSubset)); + AddHandler(0, &TCoNarrowMultiMap::Match, HNDL(DqReadWideWrapFieldSubset)); + AddHandler(0, &TCoWideMap::Match, HNDL(DqReadWideWrapFieldSubset)); AddHandler(1, &TCoFlatMap::Match, HNDL(LatePushExtractedPredicateToReadTable)); AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead)); AddHandler(1, &TCoTopSort::Match, HNDL(RewriteTopSortOverIndexRead)); AddHandler(1, &TCoTake::Match, HNDL(RewriteTakeOverIndexRead)); + AddHandler(1, &TDqReadWrapBase::Match, HNDL(DqReadWrapByProvider)); AddHandler(2, &TKqlReadTableIndex::Match, HNDL(RewriteIndexRead)); AddHandler(2, &TKqlLookupIndex::Match, HNDL(RewriteLookupIndex)); AddHandler(2, &TKqlStreamLookupIndex::Match, HNDL(RewriteStreamLookupIndex)); AddHandler(2, &TKqlReadTableIndexRanges::Match, HNDL(RewriteIndexRead)); + AddHandler(2, &TDqReadWrap::Match, HNDL(ExtractMembersOverDqReadWrapMultiUsage)); + AddHandler(2, &TDqReadWrapBase::Match, HNDL(UnorderedOverDqReadWrapMultiUsage)); AddHandler(3, &TKqlLookupTable::Match, HNDL(RewriteLookupTable)); @@ -256,6 +267,70 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { return output; } + TMaybeNode UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + auto output = NDq::UnorderedOverDqReadWrap(node, ctx, getParents, true, TypesCtx); + if (output) { + DumpAppliedRule("UnorderedOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode ExtractMembersOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + auto output = NDq::ExtractMembersOverDqReadWrap(node, ctx, getParents, true, TypesCtx); + if (output) { + DumpAppliedRule("ExtractMembersOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode TakeOrSkipOverDqReadWrap(TExprBase node, TExprContext& ctx) { + auto output = NDq::TakeOrSkipOverDqReadWrap(node, ctx, TypesCtx); + if (output) { + DumpAppliedRule("TakeOrSkipOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode ExtendOverDqReadWrap(TExprBase node, TExprContext& ctx) { + auto output = NDq::ExtendOverDqReadWrap(node, ctx, TypesCtx); + if (output) { + DumpAppliedRule("ExtendOverDqReadWrap", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode DqReadWideWrapFieldSubset(TExprBase node, TExprContext& ctx, const TGetParents& getParents) { + auto output = NDq::DqReadWideWrapFieldSubset(node, ctx, getParents, TypesCtx); + if (output) { + DumpAppliedRule("DqReadWideWrapFieldSubset", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode DqReadWrapByProvider(TExprBase node, TExprContext& ctx) { + auto output = NDq::DqReadWrapByProvider(node, ctx, TypesCtx); + if (output) { + DumpAppliedRule("DqReadWrapByProvider", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode ExtractMembersOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + auto output = NDq::ExtractMembersOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx); + if (output) { + DumpAppliedRule("ExtractMembersOverDqReadWrapMultiUsage", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + + TMaybeNode UnorderedOverDqReadWrapMultiUsage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { + auto output = NDq::UnorderedOverDqReadWrapMultiUsage(node, ctx, optCtx, getParents, TypesCtx); + if (output) { + DumpAppliedRule("UnorderedOverDqReadWrapMultiUsage", node.Ptr(), output.Cast().Ptr(), ctx); + } + return output; + } + template TMaybeNode ApplyExtractMembersToReadTable(TExprBase node, TExprContext& ctx, const TGetParents& getParents) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index b3a4cea9aeaf..0baf6001e694 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -30,6 +30,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { { #define HNDL(name) "KqpPhysical-"#name, Hndl(&TKqpPhysicalOptTransformer::name) AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap)); + AddHandler(0, &TDqReadWrap::Match, HNDL(BuildStageWithReadWrap)); AddHandler(0, &TKqlReadTable::Match, HNDL(BuildReadTableStage)); AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage)); AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage)); @@ -581,6 +582,12 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { return output; } + TMaybeNode BuildStageWithReadWrap(TExprBase node, TExprContext& ctx) { + TExprBase output = DqBuildStageWithReadWrap(node, ctx); + DumpAppliedRule("BuildStageWithReadWrap", node.Ptr(), output.Ptr(), ctx); + return output; + } + private: TTypeAnnotationContext& TypesCtx; const TKqpOptimizeContext& KqpCtx; diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 0127aa5fbc43..f69e2ffbb9f8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -661,9 +661,8 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T if (!ExploreTx(child, ctx, dataSink, txRes, tablesData, types)) { return false; } - - return true; } + return true; } if (node.Maybe() || diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index c7142edf73b6..a3d5ab84e99c 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -225,20 +225,22 @@ const TKikimrTableMetadata& TKqlCompileContext::GetTableMeta(const TKqpTable& ta TIntrusivePtr CreateKqlCompiler(const TKqlCompileContext& ctx, TTypeAnnotationContext& typesCtx) { auto compiler = MakeIntrusive(); - compiler->AddCallable({TDqSourceWideWrap::CallableName(), TDqSourceWideBlockWrap::CallableName(), TDqReadWideWrap::CallableName()}, + compiler->AddCallable({TDqSourceWideWrap::CallableName(), TDqSourceWideBlockWrap::CallableName(), TDqReadWideWrap::CallableName(), TDqReadBlockWideWrap::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext&) { YQL_ENSURE(false, "Unsupported reader: " << node.Head().Content()); return TRuntimeNode(); }); + std::unordered_set usedProviders; for (const auto& provider : typesCtx.DataSources) { if (auto* dqIntegration = provider->GetDqIntegration()) { dqIntegration->RegisterMkqlCompiler(*compiler); + usedProviders.emplace(provider->GetName()); } } for (const auto& provider : typesCtx.DataSinks) { - if (auto* dqIntegration = provider->GetDqIntegration()) { + if (auto* dqIntegration = provider->GetDqIntegration(); dqIntegration && !usedProviders.contains(TString(provider->GetName()))) { dqIntegration->RegisterMkqlCompiler(*compiler); } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e43117fa8dd3..9d87d49e21e3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -170,6 +170,7 @@ class TKqpSessionActor : public TActorBootstrapped { , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) + , ActorSystem(TActivationContext::ActorSystem()) , QueryServiceConfig(queryServiceConfig) , MetadataProviderConfig(metadataProviderConfig) , KqpTempTablesAgentActor(kqpTempTablesAgentActor) @@ -1889,6 +1890,18 @@ class TKqpSessionActor : public TActorBootstrapped { if (isFinal) Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + if (isFinal && FederatedQuerySetup && FederatedQuerySetup->YtGateway) { + FederatedQuerySetup->YtGateway->CloseSession( + IYtGateway::TCloseSessionOptions(SessionId) + ).Subscribe([actorSystem = this->ActorSystem, sessionId = this->SessionId](const NThreading::TFuture& future) { + try { + future.GetValue(); + } catch (...) { + LOG_ERROR_S(*actorSystem, NKikimrServices::KQP_SESSION, "Failed to close yt session with id: " << sessionId << ", exception: " << CurrentExceptionMessage()); + } + }); + } + if (isFinal) { // no longer intrested in any compilation responses CompilationCookie->store(false); @@ -2340,6 +2353,7 @@ class TKqpSessionActor : public TActorBootstrapped { std::optional ShutdownState; TULIDGenerator UlidGen; NTxProxy::TRequestControls RequestControls; + TActorSystem* const ActorSystem; TKqpTempTablesState TempTablesState; diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index f2d7b7aba9d3..835e86105b95 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped { Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, - FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false); + FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, SessionId); auto& queryRequest = QueryState->RequestEv; QueryState->ProxyRequestId = proxyRequestId; diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index 2d49100c163a..d533827ad224 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -36,7 +36,9 @@ namespace NKikimr::NKqp::NFederatedQueryTest { nullptr, databaseAsyncResolver, appConfig->GetQueryServiceConfig().GetS3(), - appConfig->GetQueryServiceConfig().GetGeneric()); + appConfig->GetQueryServiceConfig().GetGeneric(), + appConfig->GetQueryServiceConfig().GetYt(), + nullptr); auto settings = TKikimrSettings() .SetFeatureFlags(featureFlags) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 7e0fbc7c16c9..df287c5bfe56 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -53,7 +53,7 @@ TIntrusivePtr CreateKikimrQueryProcessor(TIntrusivePtr ga UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings)); kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true); - auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}}); + auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr}); return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem); } diff --git a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp index 0597df0d72e9..aad73eda0583 100644 --- a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp @@ -8,7 +8,6 @@ extern "C" { -#include "postgres.h" #include "catalog/pg_type_d.h" } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 537c326e1b5a..ea9bdb99cb61 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -31,6 +31,7 @@ import "ydb/core/protos/tablet.proto"; import "ydb/core/protos/tenant_pool.proto"; import "ydb/core/protos/tenant_slot_broker.proto"; import "ydb/library/actors/protos/interconnect.proto"; +import "ydb/library/yql/core/file_storage/proto/file_storage.proto"; import "ydb/library/yql/providers/common/proto/gateways_config.proto"; package NKikimrConfig; @@ -968,6 +969,8 @@ message TQueryServiceConfig { repeated string HostnamePatterns = 13; // List of hostname regexps for external data sources; disabled if empty optional NYql.TS3GatewayConfig S3 = 6; + optional NYql.TYtGatewayConfig Yt = 15; + optional NYql.TFileStorageConfig FileStorage = 16; optional NYql.THttpGatewayConfig HttpGateway = 7; optional NYql.TGenericConnectorConfig Connector = 8 [deprecated=true]; optional string MdbGateway = 9 [deprecated=true]; diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 8c5e3dd9fc52..92b6895a8850 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -153,6 +153,7 @@ PEERDIR( ydb/library/login/protos ydb/library/mkql_proto/protos ydb/public/api/protos + ydb/library/yql/core/file_storage/proto ydb/library/yql/core/issue/protos ydb/library/yql/dq/actors/protos ydb/library/yql/dq/proto diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 60427d069068..bf5fef2c619d 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -880,7 +880,9 @@ namespace Tests { Settings->CredentialsFactory, databaseAsyncResolver, queryServiceConfig.GetS3(), - queryServiceConfig.GetGeneric() + queryServiceConfig.GetGeneric(), + queryServiceConfig.GetYt(), + NKqp::MakeYtGateway(GetFunctionRegistry(), queryServiceConfig) ); } diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make index e7a93b69ae78..5f3f7aa58928 100644 --- a/ydb/core/testlib/ya.make +++ b/ydb/core/testlib/ya.make @@ -82,6 +82,8 @@ PEERDIR( ydb/library/persqueue/topic_parser ydb/library/security ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/providers/yt/codec/codegen + ydb/library/yql/providers/yt/comp_nodes/llvm14 ydb/library/yql/public/udf/service/exception_policy ydb/public/lib/base ydb/public/lib/deprecated/kicli diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp index 36a14f7af7f7..9a13de3cd7a2 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp +++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp @@ -81,18 +81,20 @@ TTransformationPipeline& TTransformationPipeline::AddPreTypeAnnotation(EYqlIssue return *this; } -TTransformationPipeline& TTransformationPipeline::AddPreIOAnnotation(EYqlIssueCode issueCode) { +TTransformationPipeline& TTransformationPipeline::AddPreIOAnnotation(bool withEpochsTransformer, EYqlIssueCode issueCode) { Transformers_.push_back(TTransformStage( CreateIODiscoveryTransformer(*TypeAnnotationContext_), "IODiscovery", issueCode)); - Transformers_.push_back(TTransformStage( - CreateEpochsTransformer(*TypeAnnotationContext_), "Epochs", issueCode)); + if (withEpochsTransformer) { + Transformers_.push_back(TTransformStage( + CreateEpochsTransformer(*TypeAnnotationContext_), "Epochs", issueCode)); + } AddIntentDeterminationTransformer(); return *this; } -TTransformationPipeline& TTransformationPipeline::AddIOAnnotation(EYqlIssueCode issueCode) { - AddPreIOAnnotation(issueCode); +TTransformationPipeline& TTransformationPipeline::AddIOAnnotation(bool withEpochsTransformer, EYqlIssueCode issueCode) { + AddPreIOAnnotation(withEpochsTransformer, issueCode); AddTableMetadataLoaderTransformer(); auto& typeCtx = *TypeAnnotationContext_; diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.h b/ydb/library/yql/core/services/yql_transform_pipeline.h index 37b52d5e1f8b..c18a693a07d8 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.h +++ b/ydb/library/yql/core/services/yql_transform_pipeline.h @@ -27,8 +27,8 @@ class TTransformationPipeline TTransformationPipeline& AddPreTypeAnnotation(EYqlIssueCode issueCode = TIssuesIds::CORE_PRE_TYPE_ANN); TTransformationPipeline& AddExpressionEvaluation(const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, IGraphTransformer* calcTransfomer = nullptr, EYqlIssueCode issueCode = TIssuesIds::CORE_EXPR_EVALUATION); - TTransformationPipeline& AddPreIOAnnotation(EYqlIssueCode issueCode = TIssuesIds::CORE_PRE_TYPE_ANN); - TTransformationPipeline& AddIOAnnotation(EYqlIssueCode issueCode = TIssuesIds::CORE_PRE_TYPE_ANN); + TTransformationPipeline& AddPreIOAnnotation(bool withEpochsTransformer = true, EYqlIssueCode issueCode = TIssuesIds::CORE_PRE_TYPE_ANN); + TTransformationPipeline& AddIOAnnotation(bool withEpochsTransformer = true, EYqlIssueCode issueCode = TIssuesIds::CORE_PRE_TYPE_ANN); TTransformationPipeline& AddTypeAnnotation(EYqlIssueCode issueCode = TIssuesIds::CORE_TYPE_ANN); TTransformationPipeline& AddPostTypeAnnotation(bool forSubGraph = false, bool disableConstraintCheck = false, EYqlIssueCode issueCode = TIssuesIds::CORE_POST_TYPE_ANN); TTransformationPipeline& AddCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp index 9a207259d8d6..234c483361c0 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_finalize.cpp @@ -16,6 +16,11 @@ class TYtDataSinkFinalizingTransformer: public TAsyncCallbackTransformer CallbackTransform(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { Y_UNUSED(ctx); output = input; + + if (State_->PassiveExecution) { + return SyncStatus(IGraphTransformer::TStatus::Ok); + } + auto future = State_->Gateway->Finalize( IYtGateway::TFinalizeOptions(State_->SessionId) .Config(State_->Configuration->Snapshot()) diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp index 09bc7f513e8b..ce516c119987 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource.cpp @@ -190,15 +190,19 @@ class TYtDataSource : public TDataProviderBase { TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) override { YQL_CLOG(INFO, ProviderYt) << "RewriteIO"; - if (auto left = TMaybeNode(node)) { - return left.Input().Maybe().World().Cast().Ptr(); + + auto read = TCoInputBase(node).Input().Cast(); + bool buildLeft = TCoLeft::Match(node.Get()); + bool buildReadTableScheme = NYql::HasSetting(read.Arg(4).Ref(), EYtSettingType::Scheme); + + if (buildLeft && (buildReadTableScheme || !State_->PassiveExecution)) { + return read.World().Ptr(); } - auto read = TCoRight(node).Input().Cast(); - if (NYql::HasSetting(read.Arg(4).Ref(), EYtSettingType::Scheme)) { + if (buildReadTableScheme) { YQL_ENSURE(read.Arg(2).Maybe().Item(0).Maybe()); - auto newRead = InjectUdfRemapperOrView(read, ctx, true); + auto newRead = InjectUdfRemapperOrView(read, ctx, true, false); return Build(ctx, node->Pos()) .Input() @@ -213,7 +217,7 @@ class TYtDataSource : public TDataProviderBase { } YQL_ENSURE(read.Arg(2).Maybe().Item(0).Maybe()); // At least one table - return InjectUdfRemapperOrView(read, ctx, false); + return InjectUdfRemapperOrView(read, ctx, false, buildLeft); } void PostRewriteIO() final { @@ -502,7 +506,7 @@ class TYtDataSource : public TDataProviderBase { } private: - TExprNode::TPtr InjectUdfRemapperOrView(TYtRead readNode, TExprContext& ctx, bool fromReadSchema) { + TExprNode::TPtr InjectUdfRemapperOrView(TYtRead readNode, TExprContext& ctx, bool fromReadSchema, bool buildLeft) { const bool weakConcat = NYql::HasSetting(readNode.Arg(4).Ref(), EYtSettingType::WeakConcat); const bool ignoreNonExisting = NYql::HasSetting(readNode.Arg(4).Ref(), EYtSettingType::IgnoreNonExisting); const bool warnNonExisting = NYql::HasSetting(readNode.Arg(4).Ref(), EYtSettingType::WarnNonExisting); @@ -582,7 +586,7 @@ class TYtDataSource : public TDataProviderBase { auto userSchema = GetSetting(table.Settings().Ref(), EYtSettingType::UserSchema); if (userSchema) { tableReads.push_back(ctx.Builder(table.Pos()) - .Callable("Right!") + .Callable(buildLeft ? "Left!" : "Right!") .Add(0, BuildEmptyTablesRead(table.Pos(), *userSchema, ctx)) .Seal() .Build()); @@ -660,6 +664,15 @@ class TYtDataSource : public TDataProviderBase { .Build() .Done(); + if (buildLeft) { + TExprNode::TPtr leftOverRead = Build(ctx, readNode.Pos()) + .Input(origReadNode) + .Done().Ptr(); + + tableReads.push_back(leftOverRead); + continue; + } + TExprNode::TPtr rightOverRead = inlineContent ? Build(ctx, readNode.Pos()) .Input(origReadNode) @@ -822,6 +835,10 @@ class TYtDataSource : public TDataProviderBase { tableReads.push_back(newReadNode); } + if (buildLeft) { + return ctx.NewCallable(readNode.Pos(), TCoSync::CallableName(), std::move(tableReads)); + } + if (tableReads.empty()) { if (hasNonExisting) { ctx.AddError(TIssue(ctx.GetPosition(readNode.Pos()), "The list of tables is empty")); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp index 65a5e095f223..596dcad4b406 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_optimize.cpp @@ -1501,7 +1501,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { template TMaybeNode Sort(TExprBase node, TExprContext& ctx) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -1740,7 +1740,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode PartitionByKey(TExprBase node, TExprContext& ctx) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -2611,7 +2611,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode FlatMap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -2696,7 +2696,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode CombineByKey(TExprBase node, TExprContext& ctx) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -2884,6 +2884,10 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode DqWrite(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { + if (State_->PassiveExecution) { + return node; + } + auto write = node.Cast(); if (!TDqCnUnionAll::Match(write.Content().Raw())) { return node; @@ -3370,6 +3374,10 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode Fill(TExprBase node, TExprContext& ctx) const { + if (State_->PassiveExecution) { + return node; + } + auto write = node.Cast(); auto mode = NYql::GetSetting(write.Settings().Ref(), EYtSettingType::Mode); @@ -3527,6 +3535,10 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode Extend(TExprBase node, TExprContext& ctx) const { + if (State_->PassiveExecution) { + return node; + } + auto extend = node.Cast(); bool allAreTables = true; @@ -4352,7 +4364,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { template TMaybeNode LMap(TExprBase node, TExprContext& ctx) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -4885,7 +4897,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode AssumeSorted(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -5542,7 +5554,7 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode EquiJoin(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { - if (State_->Types->EvaluationInProgress) { + if (State_->Types->EvaluationInProgress || State_->PassiveExecution) { return node; } @@ -6165,6 +6177,10 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode ReadWithSettings(TExprBase node, TExprContext& ctx) const { + if (State_->PassiveExecution) { + return node; + } + auto maybeRead = node.Cast().Input().Maybe(); if (!maybeRead) { return node; @@ -6241,6 +6257,10 @@ class TYtPhysicalOptProposalTransformer : public TOptimizeTransformerBase { } TMaybeNode ReplaceStatWriteTable(TExprBase node, TExprContext& ctx) const { + if (State_->PassiveExecution) { + return node; + } + auto write = node.Cast(); auto input = write.Input(); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index 0b7bd3551d7b..3cbcb8d08c8b 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -333,6 +333,45 @@ void TYtState::LeaveEvaluation(ui64 id) { } } +std::pair, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr typeCtx) { + auto ytState = MakeIntrusive(); + ytState->SessionId = sessionId; + ytState->Gateway = gateway; + ytState->Types = typeCtx.Get(); + ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); + + if (ytGatewayConfig) { + std::unordered_set groups; + if (ytState->Types->Credentials != nullptr) { + groups.insert(ytState->Types->Credentials->GetGroups().begin(), ytState->Types->Credentials->GetGroups().end()); + } + auto filter = [userName, ytState, groups = std::move(groups)](const NYql::TAttr& attr) -> bool { + if (!attr.HasActivation()) { + return true; + } + if (NConfig::Allow(attr.GetActivation(), userName, groups)) { + with_lock(ytState->StatisticsMutex) { + ytState->Statistics[Max()].Entries.emplace_back(TStringBuilder() << "Activation:" << attr.GetName(), 0, 0, 0, 0, 1); + } + return true; + } + return false; + }; + + ytState->Configuration->Init(*ytGatewayConfig, filter, *typeCtx); + } + + TStatWriter statWriter = [ytState](ui32 publicId, const TVector& stat) { + with_lock(ytState->StatisticsMutex) { + for (size_t i = 0; i < stat.size(); ++i) { + ytState->Statistics[publicId].Entries.push_back(stat[i]); + } + } + }; + + return {ytState, statWriter}; +} + TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gateway) { return [gateway] ( const TString& userName, @@ -352,40 +391,10 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat TDataProviderInfo info; info.SupportsHidden = true; - auto ytState = MakeIntrusive(); - ytState->SessionId = sessionId; - ytState->Gateway = gateway; - ytState->Types = typeCtx.Get(); - ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); - - TStatWriter statWriter = [ytState](ui32 publicId, const TVector& stat) { - with_lock(ytState->StatisticsMutex) { - for (size_t i = 0; i < stat.size(); ++i) { - ytState->Statistics[publicId].Entries.push_back(stat[i]); - } - } - }; - - if (gatewaysConfig) { - std::unordered_set groups; - if (ytState->Types->Credentials != nullptr) { - groups.insert(ytState->Types->Credentials->GetGroups().begin(), ytState->Types->Credentials->GetGroups().end()); - } - auto filter = [userName, ytState, groups = std::move(groups)](const NYql::TAttr& attr) -> bool { - if (!attr.HasActivation()) { - return true; - } - if (NConfig::Allow(attr.GetActivation(), userName, groups)) { - with_lock(ytState->StatisticsMutex) { - ytState->Statistics[Max()].Entries.emplace_back(TStringBuilder() << "Activation:" << attr.GetName(), 0, 0, 0, 0, 1); - } - return true; - } - return false; - }; - - ytState->Configuration->Init(gatewaysConfig->GetYt(), filter, *typeCtx); - } + const TYtGatewayConfig* ytGatewayConfig = gatewaysConfig ? &gatewaysConfig->GetYt() : nullptr; + TIntrusivePtr ytState; + TStatWriter statWriter; + std::tie(ytState, statWriter) = CreateYtNativeState(gateway, userName, sessionId, ytGatewayConfig, typeCtx); info.Names.insert({TString{YtProviderName}}); info.Source = CreateYtDataSource(ytState); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h index 06d9d8f747a6..95860baa8a76 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h @@ -102,6 +102,7 @@ struct TYtState : public TThrRefBase { THolder DqIntegration_; ui32 NextEpochId = 1; bool OnlyNativeExecution = false; + bool PassiveExecution = false; TDuration TimeSpentInHybrid; NMonotonic::TMonotonic HybridStartTime; std::unordered_set HybridInFlightOprations; @@ -111,6 +112,8 @@ struct TYtState : public TThrRefBase { }; +class TYtGatewayConfig; +std::pair, TStatWriter> CreateYtNativeState(IYtGateway::TPtr gateway, const TString& userName, const TString& sessionId, const TYtGatewayConfig* ytGatewayConfig, TIntrusivePtr typeCtx); TIntrusivePtr CreateYtDataSource(TYtState::TPtr state); TIntrusivePtr CreateYtDataSink(TYtState::TPtr state); diff --git a/ydb/library/yql/sql/settings/translation_settings.h b/ydb/library/yql/sql/settings/translation_settings.h index 8e00ace30d96..6a7a498a83bf 100644 --- a/ydb/library/yql/sql/settings/translation_settings.h +++ b/ydb/library/yql/sql/settings/translation_settings.h @@ -81,6 +81,7 @@ namespace NSQLTranslation { EBindingsMode BindingsMode; THashMap Bindings; + bool SaveWorldDependencies = false; // each (name, type) entry in this map is equivalent to // DECLARE $name AS type; diff --git a/ydb/library/yql/sql/v0/query.cpp b/ydb/library/yql/sql/v0/query.cpp index 92411bc0086c..04e28fe7dde5 100644 --- a/ydb/library/yql/sql/v0/query.cpp +++ b/ydb/library/yql/sql/v0/query.cpp @@ -390,12 +390,13 @@ class TInputTablesNode final: public TAstListNode { auto source = Y("DataSource", BuildQuotedAtom(Pos, service), BuildQuotedAtom(Pos, tr.Cluster)); auto options = tr.Options ? Q(tr.Options) : Q(Y()); Add(Y("let", "x", keys->Y(TString(ReadName), "world", source, keys, fields, options))); - if (service != YtProviderName) { - if (InSubquery) { - ctx.Error() << "Using of system '" << service << "' is not allowed in SUBQUERY"; - return false; - } + if (service != YtProviderName && InSubquery) { + ctx.Error() << "Using of system '" << service << "' is not allowed in SUBQUERY"; + return false; + } + + if (service != YtProviderName || ctx.Settings.SaveWorldDependencies) { Add(Y("let", "world", Y(TString(LeftName), "x"))); } diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index 7ee433e2f205..70e04ed8d5f0 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -778,12 +778,13 @@ class TInputTablesNode final: public TAstListNode { auto source = Y("DataSource", BuildQuotedAtom(Pos, tr.Service), Scoped->WrapCluster(tr.Cluster, ctx)); auto options = tr.Options ? Q(tr.Options) : Q(Y()); Add(Y("let", "x", keys->Y(TString(ReadName), "world", source, keys, fields, options))); - if (tr.Service != YtProviderName) { - if (InSubquery) { - ctx.Error() << "Using of system '" << tr.Service << "' is not allowed in SUBQUERY"; - return false; - } + if (tr.Service != YtProviderName && InSubquery) { + ctx.Error() << "Using of system '" << tr.Service << "' is not allowed in SUBQUERY"; + return false; + } + + if (tr.Service != YtProviderName || ctx.Settings.SaveWorldDependencies) { Add(Y("let", "world", Y(TString(LeftName), "x"))); } diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf index 7ef43d6a93cb..b26a1c351557 100644 --- a/ydb/tests/tools/kqprun/configuration/app_config.conf +++ b/ydb/tests/tools/kqprun/configuration/app_config.conf @@ -27,6 +27,13 @@ QueryServiceConfig { ScriptResultRowsLimit: 0 ScriptResultSizeLimit: 10485760 + FileStorage { + MaxFiles: 1000 + MaxSizeMb: 512 + RetryCount: 3 + Threads: 2 + } + Generic { MdbGateway: "https://mdb.api.cloud.yandex.net:443" @@ -69,6 +76,13 @@ QueryServiceConfig { Value: "true" } } + + Yt { + DefaultSettings { + Name: "_EnableYtPartitioning" + Value: "true" + } + } } ResourceBrokerConfig {