Skip to content

Commit

Permalink
fix saveload logic & support loading custom udfs (ydb-platform#6144)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jul 1, 2024
1 parent a48b189 commit d82c9ad
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
55 changes: 49 additions & 6 deletions ydb/tools/query_replay_yt/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,28 @@

#include <yt/cpp/mapreduce/interface/logging/logger.h>

#include <util/string/split.h>

using namespace NActors;

TVector<std::pair<TString, TString>> GetJobFiles(TVector<TString> udfs) {
TVector<std::pair<TString, TString>> result;

for(const TString& udf: udfs) {
TVector<TString> splitResult;
Split(udf.data(), "/", splitResult);
while(!splitResult.empty() && splitResult.back().empty()) {
splitResult.pop_back();
}

Y_ENSURE(!splitResult.empty());

result.push_back(std::make_pair(udf, splitResult.back()));
}

return result;
}

class TQueryReplayMapper
: public NYT::IMapper<NYT::TTableReader<NYT::TNode>, NYT::TTableWriter<NYT::TNode>>
{
Expand All @@ -25,7 +45,8 @@ class TQueryReplayMapper
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;

TQueryReplayConfig Config;
TVector<TString> UdfFiles;
ui32 ActorSystemThreadsCount = 5;

TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) {
switch (status) {
Expand Down Expand Up @@ -58,16 +79,30 @@ class TQueryReplayMapper

public:
TQueryReplayMapper() = default;
TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) {
}

Y_SAVELOAD_JOB(UdfFiles, ActorSystemThreadsCount);

TQueryReplayMapper(TVector<TString> udfFiles, ui32 actorSystemThreadsCount)
: UdfFiles(udfFiles)
, ActorSystemThreadsCount(actorSystemThreadsCount)
{}

void Start(NYT::TTableWriter<NYT::TNode>*) override {
TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry());
FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone());
NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry);
NKikimr::NMiniKQL::TUdfModuleRemappings remappings;
THashSet<TString> usedUdfPaths;

for(const auto& [_, udfPath]: GetJobFiles(UdfFiles)) {
if (usedUdfPaths.insert(udfPath).second) {
FunctionRegistry->LoadUdfs(udfPath, remappings, 0);
}
}

AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr));
AppData->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(new NMonitoring::TDynamicCounters());
auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount);
auto setup = BuildActorSystemSetup(ActorSystemThreadsCount);
ActorSystem.Reset(new TActorSystem(setup, AppData.Get()));
ActorSystem->Start();
ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
Expand Down Expand Up @@ -164,9 +199,17 @@ int main(int argc, const char** argv) {
NYT::TMapOperationSpec spec;
spec.AddInput<NYT::TNode>(config.SrcPath);
spec.AddOutput<NYT::TNode>(NYT::TRichYPath(config.DstPath).Schema(OutputSchema()));
spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB));

client->Map(spec, new TQueryReplayMapper(config));
auto userJobSpec = NYT::TUserJobSpec();
userJobSpec.MemoryLimit(1_GB);

for(const auto& [udf, udfInJob]: GetJobFiles(config.UdfFiles)) {
userJobSpec.AddLocalFile(udf, NYT::TAddLocalFileOptions().PathInJob(udfInJob));
}

spec.MapperSpec(userJobSpec);

client->Map(spec, new TQueryReplayMapper(config.UdfFiles, config.ActorSystemThreadsCount));

return EXIT_SUCCESS;
}
13 changes: 11 additions & 2 deletions ydb/tools/query_replay_yt/query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
, Config(MakeIntrusive<TKikimrConfiguration>())
, FunctionRegistry(functionRegistry)
{
Config->EnableKqpScanQueryStreamLookup = true;
}

void Bootstrap() {
Expand Down Expand Up @@ -278,8 +279,16 @@ class TReplayCompileActor: public TActorBootstrapped<TReplayCompileActor> {
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
AsyncCompileResult = KqpHost->PrepareGenericScript(Query->Text, prepareSettings);
break;
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: {
prepareSettings.ConcurrentResults = false;
AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
break;
}
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: {
AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
break;
}

default:
YQL_ENSURE(false, "Unexpected query type: " << Query->Settings.QueryType);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/tools/query_replay_yt/query_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ void TQueryReplayConfig::ParseConfig(int argc, const char** argv) {
opts.AddLongOption("cluster", "YT cluster").StoreResult(&Cluster).Required();
opts.AddLongOption("src-path", "Source table path").StoreResult(&SrcPath).Required();
opts.AddLongOption("dst-path", "Target table path").StoreResult(&DstPath).Required();
opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&DstPath);
opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&ActorSystemThreadsCount);
opts.AddLongOption("udf-file", "UDFS to load").AppendTo(&UdfFiles);

NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/tools/query_replay_yt/query_replay.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct TQueryReplayConfig {
TString SrcPath;
TString DstPath;
ui32 ActorSystemThreadsCount = 5;
TVector<TString> UdfFiles;

void ParseConfig(int argc, const char** argv);
};
Expand Down

0 comments on commit d82c9ad

Please sign in to comment.