From b3aabaced86a45523ecc947d61f0414d79d32e98 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 7 Dec 2024 16:34:06 +0000 Subject: [PATCH] Fixed topic_filter_ut.cpp --- .../format_handler/filters/filters_set.cpp | 2 +- .../filters/purecalc_filter.cpp | 2 +- .../format_handler/ut/common/ya.make | 2 +- .../format_handler/ut/topic_filter_ut.cpp | 111 +++++++++++++++--- .../row_dispatcher/format_handler/ut/ya.make | 2 +- .../purecalc_compilation/compile_service.cpp | 7 ++ .../purecalc_compilation/ya.make | 1 + 7 files changed, 104 insertions(+), 23 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index e023fb6a0aa9..5542f044266e 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -155,7 +155,7 @@ class TTopicFiltersSet : public ITopicFiltersSet { } void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) override { - LOG_ROW_DISPATCHER_TRACE("Got compile response for reauest with id " << ev->Cookie); + LOG_ROW_DISPATCHER_TRACE("Got compile response for request with id " << ev->Cookie); const auto requestIt = InFlightCompilations.find(ev->Cookie); if (requestIt == InFlightCompilations.end()) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp index 15e73f67a1e6..93ad5dbbf3a7 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp @@ -321,7 +321,7 @@ class TPurecalcFilter : public IPurecalcFilter { str << "PRAGMA config.flags(\"LLVM\", \"" << (PurecalcSettings.EnabledLLVM ? "ON" : "OFF") << "\");\n"; str << "SELECT " << OFFSET_FIELD_NAME << " FROM Input " << Consumer->GetWhereFilter() << ";\n"; - LOG_ROW_DISPATCHER_DEBUG("Generated sql: " << str.Str()); + LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << str.Str()); return str.Str(); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ya.make b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ya.make index 47a124a9a618..11a0802e50b6 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ya.make @@ -19,8 +19,8 @@ PEERDIR( ydb/library/yql/dq/common yql/essentials/minikql - yql/essentials/minikql/invoke_builtins yql/essentials/minikql/computation + yql/essentials/minikql/invoke_builtins yql/essentials/providers/common/schema/mkql ) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 9db0c5362725..af2794b0a423 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -1,6 +1,6 @@ #include -#include #include +#include #include @@ -10,6 +10,7 @@ namespace { class TFiterFixture : public TBaseFixture { public: + using TBase = TBaseFixture; using TCallback = std::function; class TFilterConsumer : public IFilteredDataConsumer { @@ -17,10 +18,11 @@ class TFiterFixture : public TBaseFixture { using TPtr = TIntrusivePtr; public: - TFilterConsumer(const TVector& columns, const TString& whereFilter, TCallback callback) + TFilterConsumer(const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) : Columns(columns) , WhereFilter(whereFilter) , Callback(callback) + , CompileError(compileError) {} public: @@ -32,33 +34,60 @@ class TFiterFixture : public TBaseFixture { return WhereFilter; } - IPureCalcProgramFactory::TSettings GetPurecalcSettings() const override { + TPurecalcCompileSettings GetPurecalcSettings() const override { return {.EnabledLLVM = false}; } - virtual NActors::TActorId GetFilterId() const override { + NActors::TActorId GetFilterId() const override { return FilterId; } - virtual const TVector& GetColumnIds() const override { + const TVector& GetColumnIds() const override { return ColumnIds; } + TMaybe GetNextMessageOffset() const override { + return Nothing(); + } + + void OnFilterStarted() override { + Started = true; + UNIT_ASSERT_C(!CompileError, "Expected compile error: " << CompileError->second); + } + + void OnFilteringError(TStatus status) override { + if (CompileError) { + Started = true; + CheckError(status, CompileError->first, CompileError->second); + } else { + UNIT_FAIL("Filtering failed: " << status.ToString()); + } + } + void OnFilteredData(ui64 rowId) override { + UNIT_ASSERT_C(Started, "Unexpected data for not started filter"); Callback(rowId); } - + protected: NActors::TActorId FilterId; TVector ColumnIds; + bool Started = false; private: const TVector Columns; const TString WhereFilter; const TCallback Callback; + const std::optional> CompileError; }; public: + virtual void SetUp(NUnitTest::TTestContext& ctx) override { + TBase::SetUp(ctx); + + CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService()); + } + virtual void TearDown(NUnitTest::TTestContext&) override { with_lock (Alloc) { for (auto& holder : Holders) { @@ -74,12 +103,14 @@ class TFiterFixture : public TBaseFixture { public: virtual TStatus MakeFilter(const TVector& columns, const TString& whereFilter, TCallback callback) { - FilterHandler = MakeIntrusive(columns, whereFilter, callback); + FilterHandler = MakeIntrusive(columns, whereFilter, callback, CompileError); - auto filterStatus = CreatePurecalcFilter(FilterHandler, {.PureCalcProgramFactory = PureCalcProgramFactory}); + auto filterStatus = CreatePurecalcFilter(FilterHandler); if (filterStatus.IsSuccess()) { Filter = std::move(filterStatus.GetValue()); + CompileFilter(); } + return filterStatus; } @@ -118,10 +149,29 @@ class TFiterFixture : public TBaseFixture { }); } +private: + void CompileFilter() { + const auto edgeActor = Runtime.AllocateEdgeActor(); + Runtime.Send(CompileServiceActorId, edgeActor, Filter->GetCompileRequest().release()); + auto response = Runtime.GrabEdgeEvent(edgeActor, TDuration::Seconds(5)); + + UNIT_ASSERT_C(response, "Failed to get compile response"); + if (!CompileError) { + UNIT_ASSERT_C(response->Get()->ProgramHolder, "Failed to compile program, error: " << response->Get()->Issues.ToOneLineString()); + Filter->OnCompileResponse(std::move(response)); + FilterHandler->OnFilterStarted(); + } else { + CheckError(TStatus(response->Get()->Status, response->Get()->Issues), CompileError->first, CompileError->second); + } + } + public: + NActors::TActorId CompileServiceActorId; TFilterConsumer::TPtr FilterHandler; IPurecalcFilter::TPtr Filter; TList> Holders; + + std::optional> CompileError; }; class TFilterSetFixture : public TFiterFixture { @@ -131,20 +181,27 @@ class TFilterSetFixture : public TFiterFixture { class TFilterSetConsumer : public TFilterConsumer { public: using TBase = TFilterConsumer; + using TPtr = TIntrusivePtr; public: - TFilterSetConsumer(NActors::TActorId filterId, const TVector& columnIds, const TVector& columns, const TString& whereFilter, TCallback callback) - : TBase(columns, whereFilter, callback) + TFilterSetConsumer(NActors::TActorId filterId, const TVector& columnIds, const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) + : TBase(columns, whereFilter, callback, compileError) { FilterId = filterId; ColumnIds = columnIds; } + + bool IsStarted() const { + return Started; + } }; public: void SetUp(NUnitTest::TTestContext& ctx) override { TBase::SetUp(ctx); - FiltersSet = CreateTopicFiltersSet({.PureCalcConfig = {.PureCalcProgramFactory = PureCalcProgramFactory}}); + + CompileNotifier = Runtime.AllocateEdgeActor(); + FiltersSet = CreateTopicFiltersSet(CompileNotifier, {.CompileServiceId = CompileServiceActorId}, MakeIntrusive()); } void TearDown(NUnitTest::TTestContext& ctx) override { @@ -167,17 +224,31 @@ class TFilterSetFixture : public TFiterFixture { } FilterIds.emplace_back(FilterIds.size(), 0, 0, 0); - auto filterSetHandler = MakeIntrusive(FilterIds.back(), columnIds, columns, whereFilter, callback); - return FiltersSet->AddFilter(filterSetHandler); + auto filterSetHandler = MakeIntrusive(FilterIds.back(), columnIds, columns, whereFilter, callback, CompileError); + if (auto status = FiltersSet->AddFilter(filterSetHandler)) { + return status; + } + + if (!filterSetHandler->IsStarted()) { + // Wait filter compilation + auto response = Runtime.GrabEdgeEvent(CompileNotifier, TDuration::Seconds(5)); + UNIT_ASSERT_C(response, "Compilation is not performed for filter: " << whereFilter); + FiltersSet->OnCompileResponse(std::move(response)); + } + + return TStatus(); } void FilterData(const TVector& columnIndex, const TVector*>& values, ui64 numberRows = 0) { - FiltersSet->FilterData(columnIndex, values, numberRows ? numberRows : values.front()->size()); + numberRows = numberRows ? numberRows : values.front()->size(); + FiltersSet->FilterData(columnIndex, TVector(numberRows, std::numeric_limits::max()), values, numberRows); } public: TVector FilterIds; std::unordered_map ColumnIndex; + + NActors::TActorId CompileNotifier; ITopicFiltersSet::TPtr FiltersSet; }; @@ -270,11 +341,12 @@ Y_UNIT_TEST_SUITE(TestPurecalcFilter) { } Y_UNIT_TEST_F(CompilationValidation, TFiterFixture) { - CheckError(MakeFilter( + CompileError = {TStatus::EId::INTERNAL_ERROR, "Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; + MakeFilter( {{"a1", "[DataType; String]"}}, "where a2 ... 50", [&](ui64 offset) {} - ), TStatus::EId::INTERNAL_ERROR, "Failed to compile predicate 'where a2 ... 50' for purecalc filter subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"); + ); } } @@ -332,18 +404,19 @@ Y_UNIT_TEST_SUITE(TestFilterSet) { )); CheckError( - FiltersSet->AddFilter(MakeIntrusive(FilterIds.back(), TVector(), TVector(), TString(), [&](ui64 offset) {})), + FiltersSet->AddFilter(MakeIntrusive(FilterIds.back(), TVector(), TVector(), TString(), [&](ui64 offset) {}, CompileError)), TStatus::EId::INTERNAL_ERROR, "Failed to create new filter, filter with id [0:0:0] already exists" ); } Y_UNIT_TEST_F(CompilationValidation, TFilterSetFixture) { - CheckError(MakeFilter( + CompileError = {TStatus::EId::INTERNAL_ERROR, "Filed to compile purecalc program subissue: {
: Error: Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; + MakeFilter( {{"a1", "[DataType; String]"}}, "where a2 ... 50", [&](ui64 offset) {} - ), TStatus::EId::INTERNAL_ERROR, "Failed to compile predicate 'where a2 ... 50' for purecalc filter subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"); + ); } } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/ya.make index 7c7991003af5..03f73e9c21dd 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/ya.make @@ -2,7 +2,7 @@ UNITTEST_FOR(ydb/core/fq/libs/row_dispatcher/format_handler) SRCS( # format_handler_ut.cpp - # topic_filter_ut.cpp + topic_filter_ut.cpp topic_parser_ut.cpp ) diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp index e81251d17a25..a17237a501a6 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp @@ -1,5 +1,6 @@ #include "compile_service.h" +#include #include #include @@ -17,6 +18,7 @@ class TPurecalcCompileService : public NActors::TActor public: TPurecalcCompileService() : TBase(&TPurecalcCompileService::StateFunc) + , LogPrefix("TPurecalcCompileService: ") {} STRICT_STFUNC(StateFunc, @@ -24,6 +26,7 @@ class TPurecalcCompileService : public NActors::TActor ) void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { + LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie); IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); TStatus status; @@ -39,8 +42,10 @@ class TPurecalcCompileService : public NActors::TActor } if (!status.IsSuccess()) { + LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie); Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetIssues()), 0, ev->Cookie); } else { + LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie); Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); } } @@ -58,6 +63,8 @@ class TPurecalcCompileService : public NActors::TActor } private: + const TString LogPrefix; + std::map ProgramFactories; }; diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make index 784e3b60c6d1..a1508a8591b2 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + ydb/core/fq/libs/actors/logging ydb/core/fq/libs/row_dispatcher/events ydb/core/fq/libs/row_dispatcher/format_handler/common ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper