Skip to content

Commit

Permalink
KqpRun add results into clear execution (ydb-platform#1661)
Browse files Browse the repository at this point in the history
* Added results into clear execution

* Added short flags
  • Loading branch information
GrigoriyPA authored Feb 8, 2024
1 parent faf3e89 commit 8c72ada
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 62 deletions.
3 changes: 3 additions & 0 deletions ydb/tests/tools/kqprun/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
sync_dir
example
udfs
*.log
*.sql
*.bin
76 changes: 50 additions & 26 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "src/kqp_runner.h"

#include <cstdio>

#include <contrib/libs/protobuf/src/google/protobuf/text_format.h>

#include <library/cpp/colorizer/colors.h>
Expand All @@ -22,7 +24,7 @@ struct TExecutionOptions {
TString ScriptTraceId = "kqprun";

bool HasResults() const {
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE && !ClearExecution;
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
}
};

Expand All @@ -46,6 +48,9 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
ythrow yexception() << "Script execution failed";
}
if (!runner.FetchScriptResults()) {
ythrow yexception() << "Fetch script results failed";
}
} else {
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
ythrow yexception() << "Query execution failed";
Expand All @@ -54,10 +59,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}

if (executionOptions.HasResults()) {
Cout << colors.Yellow() << "Writing script results..." << colors.Default() << Endl;
if (!runner.WriteScriptResults()) {
ythrow yexception() << "Writing script results failed";
}
runner.PrintScriptResults();
}
}

Expand All @@ -74,6 +76,20 @@ THolder<TFileOutput> SetupDefaultFileOutput(const TString& filePath, IOutputStre
}


TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry(const TString& udfsDirectory, TVector<TString> udfsPaths) {
if (!udfsDirectory.empty() || !udfsPaths.empty()) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
Cout << colors.Yellow() << "Fetching udfs..." << colors.Default() << Endl;
}

NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths);
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);

return functionRegistry;
}


void RunMain(int argc, const char* argv[]) {
TExecutionOptions executionOptions;
NKqpRun::TRunnerOptions runnerOptions;
Expand All @@ -87,9 +103,11 @@ void RunMain(int argc, const char* argv[]) {
TString logFile = "-";
TString appConfigFile = "./configuration/app_config.conf";

TString traceOptType = "disabled";
TString scriptQueryAction = "execute";
TString planOutputFormat = "pretty";
TString resultOutputFormat = "rows";
i64 resultsRowsLimit = 1000;

TVector<TString> udfsPaths;
TString udfsDirectory;
Expand All @@ -103,7 +121,7 @@ void RunMain(int argc, const char* argv[]) {
.Optional()
.RequiredArgument("FILE")
.StoreResult(&schemeQueryFile);
options.AddLongOption("app-config", "File with app config (TAppConfig)")
options.AddLongOption('c', "app-config", "File with app config (TAppConfig)")
.Optional()
.RequiredArgument("FILE")
.DefaultValue(appConfigFile)
Expand Down Expand Up @@ -135,33 +153,33 @@ void RunMain(int argc, const char* argv[]) {
.NoArgument()
.DefaultValue(executionOptions.ClearExecution)
.SetFlag(&executionOptions.ClearExecution);
options.AddLongOption("trace-opt", "print AST in the begin of each transformation")
options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation, one of { scheme | script | all }")
.Optional()
.NoArgument()
.DefaultValue(runnerOptions.YdbSettings.TraceOpt)
.SetFlag(&runnerOptions.YdbSettings.TraceOpt);
options.AddLongOption("script-action", "Script query execute action, one of { execute | explain }")
.RequiredArgument("STR")
.DefaultValue(traceOptType)
.StoreResult(&traceOptType);
options.AddLongOption('A', "script-action", "Script query execute action, one of { execute | explain }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(scriptQueryAction)
.StoreResult(&scriptQueryAction);
options.AddLongOption("plan-format", "Script query plan format, one of { pretty | table | json }")
options.AddLongOption('P', "plan-format", "Script query plan format, one of { pretty | table | json }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(planOutputFormat)
.StoreResult(&planOutputFormat);
options.AddLongOption("result-format", "Script query result format, one of { rows | full }")
options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full }")
.Optional()
.RequiredArgument("STR")
.DefaultValue(resultOutputFormat)
.StoreResult(&resultOutputFormat);
options.AddLongOption("result-rows-limit", "Rows limit for script execution results")
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.Optional()
.RequiredArgument("INT")
.DefaultValue(runnerOptions.ResultsRowsLimit)
.StoreResult(&runnerOptions.ResultsRowsLimit);
.DefaultValue(resultsRowsLimit)
.StoreResult(&resultsRowsLimit);

options.AddLongOption("udf", "Load shared library with UDF by given path")
options.AddLongOption('u', "udf", "Load shared library with UDF by given path")
.Optional()
.RequiredArgument("FILE")
.AppendTo(&udfsPaths);
Expand Down Expand Up @@ -191,15 +209,19 @@ void RunMain(int argc, const char* argv[]) {

// Runner options

if (runnerOptions.ResultsRowsLimit < 0) {
ythrow yexception() << "Results rows limit less than zero";
}

THolder<TFileOutput> resultFileHolder = SetupDefaultFileOutput(resultOutputFile, runnerOptions.ResultOutput);
THolder<TFileOutput> schemeQueryAstFileHolder = SetupDefaultFileOutput(schemeQueryAstFile, runnerOptions.SchemeQueryAstOutput);
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);

runnerOptions.TraceOptType =
(traceOptType == TStringBuf("all")) ? NKqpRun::TRunnerOptions::ETraceOptType::All
: (traceOptType == TStringBuf("scheme")) ? NKqpRun::TRunnerOptions::ETraceOptType::Scheme
: (traceOptType == TStringBuf("script")) ? NKqpRun::TRunnerOptions::ETraceOptType::Script
: (traceOptType == TStringBuf("disabled")) ? NKqpRun::TRunnerOptions::ETraceOptType::Disabled
: NKqpRun::TRunnerOptions::ETraceOptType::All;
runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled;

runnerOptions.ResultOutputFormat =
(resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson
: (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson
Expand All @@ -215,20 +237,22 @@ void RunMain(int argc, const char* argv[]) {

if (logFile != "-") {
runnerOptions.YdbSettings.LogOutputFile = logFile;
std::remove(logFile.c_str());
}

runnerOptions.YdbSettings.YqlToken = GetEnv("YQL_TOKEN");

NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths);
auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);
runnerOptions.YdbSettings.FunctionRegistry = functionRegistry.Get();
runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths).Get();

TString appConfigData = TFileInput(appConfigFile).ReadAll();
if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) {
ythrow yexception() << "Bad format of app configuration";
}

if (resultsRowsLimit < 0) {
ythrow yexception() << "Results rows limit less than zero";
}
runnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(resultsRowsLimit);

RunScript(executionOptions, runnerOptions);
}

Expand Down
43 changes: 38 additions & 5 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ namespace {

class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit)
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
: Request_(std::move(request))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
{
if (resultSizeLimit && resultSizeLimit < std::numeric_limits<i64>::max()) {
if (resultRowsLimit) {
ResultRowsLimit_ = resultRowsLimit;
}
if (resultSizeLimit) {
ResultSizeLimit_ = resultSizeLimit;
}
}
Expand All @@ -36,6 +43,28 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
response->Record.SetFreeSpace(ResultSizeLimit_);

auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
if (resultSetIndex >= ResultSets_.size()) {
ResultSets_.resize(resultSetIndex + 1);
}

if (!ResultSets_[resultSetIndex].truncated()) {
for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) {
if (static_cast<ui64>(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) {
ResultSets_[resultSetIndex].set_truncated(true);
break;
}

*ResultSets_[resultSetIndex].add_rows() = std::move(row);
}
*ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns();
}

Send(ev->Sender, response.Release());
}

Expand All @@ -47,13 +76,17 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
i64 ResultSizeLimit_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
};

} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit) {
return new TRunScriptActorMock(std::move(request), promise, resultSizeLimit);
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets);
}

} // namespace NKqpRun
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace NKqpRun {

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request, NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise, ui64 resultSizeLimit);
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets);

} // namespace NKqpRun
14 changes: 10 additions & 4 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,36 @@ namespace NKqpRun {
struct TYdbSetupSettings {
TString DomainName = "Root";

bool TraceOpt = false;
bool TraceOptEnabled = false;
TMaybe<TString> LogOutputFile;

TString YqlToken;
NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry = nullptr;
NKikimrConfig::TAppConfig AppConfig;
};


struct TRunnerOptions {
enum class ETraceOptType {
Disabled,
Scheme,
Script,
All,
};

enum class EResultOutputFormat {
RowsJson, // Rows in json format
FullJson, // Columns, rows and types in json format
};

i64 ResultsRowsLimit = 1000;

IOutputStream* ResultOutput = &Cout;
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;

EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;
ETraceOptType TraceOptType = ETraceOptType::Disabled;

TYdbSetupSettings YdbSettings;
};
Expand Down
Loading

0 comments on commit 8c72ada

Please sign in to comment.