Skip to content

Commit

Permalink
Yt supported
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Mar 1, 2024
1 parent 156670e commit b252346
Show file tree
Hide file tree
Showing 37 changed files with 358 additions and 88 deletions.
1 change: 1 addition & 0 deletions ydb/apps/ydbd/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CHECK_DEPENDENT_DIRS(
tools/rorescompiler
util
ydb
yt
)

YQL_LAST_ABI_VERSION()
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/config/ut/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,17 @@ Y_UNIT_TEST_SUITE(ConfigProto) {
"/AppConfig/FederatedQueryConfig/Gateways/Ydb/DefaultSettings/Name/Name",
"/AppConfig/FederatedQueryConfig/Gateways/Pq/ClusterMapping/Settings/Name/Name",
"/AppConfig/FederatedQueryConfig/Gateways/Pq/DefaultSettings/Name/Name",
"/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Hour/Hour",
"/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Percentage/Percentage",
"/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Name/Name",
"/AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Value/Value",
"/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Hour/Hour",
"/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Percentage/Percentage",
"/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Name/Name",
"/AppConfig/QueryServiceConfig/Yt/DefaultSettings/Value/Value",
"/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Pattern/Pattern",
"/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Cluster/Cluster",
"/AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Path/Path",
"/AppConfig/QueryServiceConfig/Generic/DefaultSettings/Value/Value",
"/AppConfig/FederatedQueryConfig/Gateways/S3/ClusterMapping/Settings/Value/Value",
"/AppConfig/QueryServiceConfig/S3/ClusterMapping/Settings/Value/Value",
Expand Down Expand Up @@ -298,6 +309,17 @@ Y_UNIT_TEST_SUITE(ConfigProto) {
{58, 9, 2, 102, 1, 1}, // /AppConfig/FederatedQueryConfig/Gateways/Dq/DefaultSettings/Name/Name
{73, 6, 100, 2, 2}, // /AppConfig/QueryServiceConfig/S3/DefaultSettings/Value/Value
{73, 11, 6, 2, 2}, // /AppConfig/QueryServiceConfig/Generic/DefaultSettings/Value/Value
{73, 15, 101, 100, 3, 2, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Hour/Hour
{73, 15, 101, 100, 3, 2, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Activation/ByHour/Percentage/Percentage
{73, 15, 101, 100, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Name/Name
{73, 15, 101, 100, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/ClusterMapping/Settings/Value/Value
{73, 15, 102, 3, 2, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Hour/Hour
{73, 15, 102, 3, 2, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Activation/ByHour/Percentage/Percentage
{73, 15, 102, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Name/Name
{73, 15, 102, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/DefaultSettings/Value/Value
{73, 15, 100, 1, 1}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Pattern/Pattern
{73, 15, 100, 2, 2}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Cluster/Cluster
{73, 15, 100, 3, 3}, // /AppConfig/QueryServiceConfig/Yt/RemoteFilePatterns/Path/Path
{58, 9, 6, 1, 100, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/Solomon/ClusterMapping/Settings/Value/Value
{58, 9, 5, 1, 100, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/S3/ClusterMapping/Settings/Value/Value
{58, 9, 6, 2, 2, 2}, // /AppConfig/FederatedQueryConfig/Gateways/Solomon/DefaultSettings/Value/Value
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ PEERDIR(
ydb/library/pdisk_io
ydb/library/security
ydb/library/yql/minikql/comp_nodes/llvm14
ydb/library/yql/providers/yt/codec/codegen
ydb/library/yql/providers/yt/comp_nodes/llvm14
ydb/library/yql/providers/pq/cm_client
ydb/library/yql/public/udf/service/exception_policy
ydb/public/lib/base
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState), nullptr, UserRequestContext->SessionId);

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <ydb/library/yql/providers/generic/actors/yql_generic_source_factory.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>
#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>


namespace NKikimr {
Expand All @@ -25,13 +26,18 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
MKQL_ENSURE_S(computeCtx);

auto computeFactory = GetKqpBaseComputeFactory(computeCtx);
auto ytComputeFactory = NYql::GetDqYtFactory();

return [computeFactory, computeCtx]
return [computeFactory, ytComputeFactory, computeCtx]
(TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
if (auto compute = computeFactory(callable, ctx)) {
return compute;
}

if (auto ytCompute = ytComputeFactory(callable, ctx)) {
return ytCompute;
}

auto name = callable.GetType()->GetName();

if (name == "KqpWideReadTable"sv) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PEERDIR(
ydb/library/yql/dq/actors/compute
ydb/library/yql/providers/generic/actors
ydb/library/yql/providers/s3/actors
ydb/library/yql/providers/yt/comp_nodes/dq
ydb/library/yql/public/issue
)

Expand Down
19 changes: 17 additions & 2 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@
#include <ydb/core/fq/libs/db_id_async_resolver_impl/db_async_resolver_impl.h>
#include <ydb/core/fq/libs/db_id_async_resolver_impl/mdb_endpoint_generator.h>

#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
#include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>

#include <util/system/file.h>
#include <util/stream/file.h>

namespace NKikimr::NKqp {
NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig) {
NYql::TYtNativeServices ytServices;
ytServices.FunctionRegistry = functionRegistry;
ytServices.FileStorage = WithAsync(CreateFileStorage(queryServiceConfig.GetFileStorage(), {MakeYtDownloader(queryServiceConfig.GetFileStorage())}));
ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(queryServiceConfig.GetYt());
return CreateYtNativeGateway(ytServices);
}

NYql::THttpGatewayConfig DefaultHttpGatewayConfig() {
NYql::THttpGatewayConfig config;
Expand Down Expand Up @@ -53,6 +63,9 @@ namespace NKikimr::NKqp {

S3GatewayConfig = queryServiceConfig.GetS3();

YtGatewayConfig = queryServiceConfig.GetYt();
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);

// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
Expand Down Expand Up @@ -102,7 +115,9 @@ namespace NKikimr::NKqp {
CredentialsFactory,
nullptr,
S3GatewayConfig,
GenericGatewaysConfig};
GenericGatewaysConfig,
YtGatewayConfig,
YtGateway};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && GenericGatewaysConfig.HasMdbGateway() && MdbEndpointGenerator) {
Expand All @@ -128,4 +143,4 @@ namespace NKikimr::NKqp {

return std::make_shared<NKikimr::NKqp::TKqpFederatedQuerySetupFactoryDefault>(setup, appData, appConfig);
}
}
} // namespace NKikimr::NKqp
18 changes: 15 additions & 3 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>

namespace NKikimr::NKqp {
NYql::IYtGateway::TPtr MakeYtGateway(const NMiniKQL::IFunctionRegistry* functionRegistry, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig);

struct TKqpFederatedQuerySetup {
NYql::IHTTPGateway::TPtr HttpGateway;
Expand All @@ -18,6 +20,8 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::TYtGatewayConfig YtGatewayConfig;
NYql::IYtGateway::TPtr YtGateway;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -47,6 +51,8 @@ namespace NKikimr::NKqp {
NYql::IHTTPGateway::TPtr HttpGateway;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewaysConfig;
NYql::TYtGatewayConfig YtGatewayConfig;
NYql::IYtGateway::TPtr YtGateway;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<NActors::TActorId> DatabaseResolverActorId;
Expand All @@ -62,19 +68,23 @@ namespace NKikimr::NKqp {
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::IDatabaseAsyncResolver::TPtr databaseAsyncResolver,
const NYql::TS3GatewayConfig& s3GatewayConfig,
const NYql::TGenericGatewayConfig& genericGatewayConfig)
const NYql::TGenericGatewayConfig& genericGatewayConfig,
const NYql::TYtGatewayConfig& ytGatewayConfig,
NYql::IYtGateway::TPtr ytGateway)
: HttpGateway(httpGateway)
, ConnectorClient(connectorClient)
, CredentialsFactory(credentialsFactory)
, DatabaseAsyncResolver(databaseAsyncResolver)
, S3GatewayConfig(s3GatewayConfig)
, GenericGatewayConfig(genericGatewayConfig)
, YtGatewayConfig(ytGatewayConfig)
, YtGateway(ytGateway)
{
}

std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig};
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway};
}

private:
Expand All @@ -84,10 +94,12 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::TYtGatewayConfig YtGatewayConfig;
NYql::IYtGateway::TPtr YtGateway;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData,
const NKikimrConfig::TAppConfig& config);
}
} // namespace NKikimr::NKqp
2 changes: 2 additions & 0 deletions ydb/core/kqp/federated_query/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ PEERDIR(
ydb/library/db_pool/protos
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/generic/connector/libcpp
ydb/library/yql/providers/yt/gateway/native
ydb/library/yql/providers/yt/lib/yt_download
)

YQL_LAST_ABI_VERSION()
Expand Down
37 changes: 33 additions & 4 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
#include <ydb/library/yql/providers/pg/provider/yql_pg_provider_impl.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>

#include <library/cpp/cache/cache.h>
Expand Down Expand Up @@ -958,9 +959,10 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr)
NActors::TActorSystem* actorSystem = nullptr, TString sessionId = "")
: Gateway(gateway)
, Cluster(cluster)
, SessionId(sessionId)
, ExprCtx(new TExprContext())
, ModuleResolver(moduleResolver)
, KeepConfigChanges(keepConfigChanges)
Expand Down Expand Up @@ -1536,6 +1538,29 @@ class TKqpHost : public IKqpHost {
TypesCtx->AddDataSink(NYql::GenericProviderName, NYql::CreateGenericDataSink(state));
}

void InitYtProvider() {
Y_ENSURE(SessionId);

TString userName = CreateGuidAsString();
if (SessionCtx->GetUserToken() && SessionCtx->GetUserToken()->GetUserSID()) {
userName = SessionCtx->GetUserToken()->GetUserSID();
}

auto [ytState, statWriter] = CreateYtNativeState(FederatedQuerySetup->YtGateway, userName, SessionId, &FederatedQuerySetup->YtGatewayConfig, TypesCtx);

ytState->PassiveExecution = true;
ytState->Gateway->OpenSession(
IYtGateway::TOpenSessionOptions(SessionId)
.UserName(userName)
.RandomProvider(TAppData::RandomProvider)
.TimeProvider(TAppData::TimeProvider)
.StatWriter(statWriter)
);

TypesCtx->AddDataSource(YtProviderName, CreateYtDataSource(ytState));
TypesCtx->AddDataSink(YtProviderName, CreateYtDataSink(ytState));
}

void InitPgProvider() {
auto state = MakeIntrusive<NYql::TPgState>();
state->Types = TypesCtx.Get();
Expand Down Expand Up @@ -1577,6 +1602,9 @@ class TKqpHost : public IKqpHost {
if (addExternalDataSources && FederatedQuerySetup) {
InitS3Provider(queryType);
InitGenericProvider();
if (FederatedQuerySetup->YtGatewayConfig.ClusterMappingSize()) {
InitYtProvider();
}
}

InitPgProvider();
Expand Down Expand Up @@ -1619,7 +1647,7 @@ class TKqpHost : public IKqpHost {
.AddPreTypeAnnotation()
.AddExpressionEvaluation(*FuncRegistry)
.Add(new TFailExpressionEvaluation(), "FailExpressionEvaluation")
.AddIOAnnotation()
.AddIOAnnotation(false)
.AddTypeAnnotation()
.Add(TCollectParametersTransformer::Sync(SessionCtx->QueryPtr()), "CollectParameters")
.AddPostTypeAnnotation()
Expand Down Expand Up @@ -1674,6 +1702,7 @@ class TKqpHost : public IKqpHost {
private:
TIntrusivePtr<IKqpGateway> Gateway;
TString Cluster;
TString SessionId;
THolder<TExprContext> ExprCtx;
IModuleResolver::TPtr ModuleResolver;
bool KeepConfigChanges;
Expand Down Expand Up @@ -1722,10 +1751,10 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, TString sessionId)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem);
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, sessionId);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/);
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/, TString sessionId = "");

} // namespace NKqp
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_translate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ NSQLTranslation::TTranslationSettings GetTranslationSettings(NYql::EKikimrQueryT
if (isEnableExternalDataSources) {
settings.DynamicClusterProvider = NYql::KikimrProviderName;
settings.BindingsMode = bindingsMode;
settings.SaveWorldDependencies = true;
}

settings.InferSyntaxVersion = true;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/node_service/kqp_node_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KqpNode : public TTestBase {
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();

auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr});
auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup);
auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory);
KqpNodeActorId = Runtime->Register(kqpNode);
Expand Down
Loading

0 comments on commit b252346

Please sign in to comment.