From d82c9ad5595510629fea2aa1f037fd18ed7ae4db Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Mon, 1 Jul 2024 21:18:50 +0300 Subject: [PATCH] fix saveload logic & support loading custom udfs (#6144) --- ydb/tools/query_replay_yt/main.cpp | 55 +++++++++++++++++--- ydb/tools/query_replay_yt/query_compiler.cpp | 13 ++++- ydb/tools/query_replay_yt/query_replay.cpp | 3 +- ydb/tools/query_replay_yt/query_replay.h | 1 + 4 files changed, 63 insertions(+), 9 deletions(-) diff --git a/ydb/tools/query_replay_yt/main.cpp b/ydb/tools/query_replay_yt/main.cpp index 55b9a092608c..c9237e84a245 100644 --- a/ydb/tools/query_replay_yt/main.cpp +++ b/ydb/tools/query_replay_yt/main.cpp @@ -13,8 +13,28 @@ #include +#include + using namespace NActors; +TVector> GetJobFiles(TVector udfs) { + TVector> result; + + for(const TString& udf: udfs) { + TVector splitResult; + Split(udf.data(), "/", splitResult); + while(!splitResult.empty() && splitResult.back().empty()) { + splitResult.pop_back(); + } + + Y_ENSURE(!splitResult.empty()); + + result.push_back(std::make_pair(udf, splitResult.back())); + } + + return result; +} + class TQueryReplayMapper : public NYT::IMapper, NYT::TTableWriter> { @@ -25,7 +45,8 @@ class TQueryReplayMapper TIntrusivePtr FunctionRegistry; TIntrusivePtr ModuleResolverState; - TQueryReplayConfig Config; + TVector UdfFiles; + ui32 ActorSystemThreadsCount = 5; TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) { switch (status) { @@ -58,16 +79,30 @@ class TQueryReplayMapper public: TQueryReplayMapper() = default; - TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) { - } + + Y_SAVELOAD_JOB(UdfFiles, ActorSystemThreadsCount); + + TQueryReplayMapper(TVector udfFiles, ui32 actorSystemThreadsCount) + : UdfFiles(udfFiles) + , ActorSystemThreadsCount(actorSystemThreadsCount) + {} void Start(NYT::TTableWriter*) override { TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry()); FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone()); NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry); + NKikimr::NMiniKQL::TUdfModuleRemappings remappings; + THashSet usedUdfPaths; + + for(const auto& [_, udfPath]: GetJobFiles(UdfFiles)) { + if (usedUdfPaths.insert(udfPath).second) { + FunctionRegistry->LoadUdfs(udfPath, remappings, 0); + } + } + AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr)); AppData->Counters = MakeIntrusive(new NMonitoring::TDynamicCounters()); - auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount); + auto setup = BuildActorSystemSetup(ActorSystemThreadsCount); ActorSystem.Reset(new TActorSystem(setup, AppData.Get())); ActorSystem->Start(); ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr)); @@ -164,9 +199,17 @@ int main(int argc, const char** argv) { NYT::TMapOperationSpec spec; spec.AddInput(config.SrcPath); spec.AddOutput(NYT::TRichYPath(config.DstPath).Schema(OutputSchema())); - spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB)); - client->Map(spec, new TQueryReplayMapper(config)); + auto userJobSpec = NYT::TUserJobSpec(); + userJobSpec.MemoryLimit(1_GB); + + for(const auto& [udf, udfInJob]: GetJobFiles(config.UdfFiles)) { + userJobSpec.AddLocalFile(udf, NYT::TAddLocalFileOptions().PathInJob(udfInJob)); + } + + spec.MapperSpec(userJobSpec); + + client->Map(spec, new TQueryReplayMapper(config.UdfFiles, config.ActorSystemThreadsCount)); return EXIT_SUCCESS; } diff --git a/ydb/tools/query_replay_yt/query_compiler.cpp b/ydb/tools/query_replay_yt/query_compiler.cpp index b5d69560a834..64cd4aecfd07 100644 --- a/ydb/tools/query_replay_yt/query_compiler.cpp +++ b/ydb/tools/query_replay_yt/query_compiler.cpp @@ -221,6 +221,7 @@ class TReplayCompileActor: public TActorBootstrapped { , Config(MakeIntrusive()) , FunctionRegistry(functionRegistry) { + Config->EnableKqpScanQueryStreamLookup = true; } void Bootstrap() { @@ -278,8 +279,16 @@ class TReplayCompileActor: public TActorBootstrapped { case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: AsyncCompileResult = KqpHost->PrepareGenericScript(Query->Text, prepareSettings); break; - case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: - case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: { + prepareSettings.ConcurrentResults = false; + AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr); + break; + } + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: { + AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr); + break; + } + default: YQL_ENSURE(false, "Unexpected query type: " << Query->Settings.QueryType); } diff --git a/ydb/tools/query_replay_yt/query_replay.cpp b/ydb/tools/query_replay_yt/query_replay.cpp index 1cbfedc05739..ddc1b4417f76 100644 --- a/ydb/tools/query_replay_yt/query_replay.cpp +++ b/ydb/tools/query_replay_yt/query_replay.cpp @@ -17,7 +17,8 @@ void TQueryReplayConfig::ParseConfig(int argc, const char** argv) { opts.AddLongOption("cluster", "YT cluster").StoreResult(&Cluster).Required(); opts.AddLongOption("src-path", "Source table path").StoreResult(&SrcPath).Required(); opts.AddLongOption("dst-path", "Target table path").StoreResult(&DstPath).Required(); - opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&DstPath); + opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&ActorSystemThreadsCount); + opts.AddLongOption("udf-file", "UDFS to load").AppendTo(&UdfFiles); NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); } diff --git a/ydb/tools/query_replay_yt/query_replay.h b/ydb/tools/query_replay_yt/query_replay.h index 062a966925e5..10cfff2837d9 100644 --- a/ydb/tools/query_replay_yt/query_replay.h +++ b/ydb/tools/query_replay_yt/query_replay.h @@ -18,6 +18,7 @@ struct TQueryReplayConfig { TString SrcPath; TString DstPath; ui32 ActorSystemThreadsCount = 5; + TVector UdfFiles; void ParseConfig(int argc, const char** argv); };