Skip to content

Commit

Permalink
Fixed topic_filter_ut.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent f741eff commit b3aabac
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class TTopicFiltersSet : public ITopicFiltersSet {
}

void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr ev) override {
LOG_ROW_DISPATCHER_TRACE("Got compile response for reauest with id " << ev->Cookie);
LOG_ROW_DISPATCHER_TRACE("Got compile response for request with id " << ev->Cookie);

const auto requestIt = InFlightCompilations.find(ev->Cookie);
if (requestIt == InFlightCompilations.end()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ class TPurecalcFilter : public IPurecalcFilter {
str << "PRAGMA config.flags(\"LLVM\", \"" << (PurecalcSettings.EnabledLLVM ? "ON" : "OFF") << "\");\n";
str << "SELECT " << OFFSET_FIELD_NAME << " FROM Input " << Consumer->GetWhereFilter() << ";\n";

LOG_ROW_DISPATCHER_DEBUG("Generated sql: " << str.Str());
LOG_ROW_DISPATCHER_DEBUG("Generated sql:\n" << str.Str());
return str.Str();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ PEERDIR(
ydb/library/yql/dq/common

yql/essentials/minikql
yql/essentials/minikql/invoke_builtins
yql/essentials/minikql/computation
yql/essentials/minikql/invoke_builtins
yql/essentials/providers/common/schema/mkql
)

Expand Down
111 changes: 92 additions & 19 deletions ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h>
#include <ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.h>

#include <yql/essentials/minikql/mkql_string_util.h>

Expand All @@ -10,17 +10,19 @@ namespace {

class TFiterFixture : public TBaseFixture {
public:
using TBase = TBaseFixture;
using TCallback = std::function<void(ui64 rowId)>;

class TFilterConsumer : public IFilteredDataConsumer {
public:
using TPtr = TIntrusivePtr<TFilterConsumer>;

public:
TFilterConsumer(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback)
TFilterConsumer(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, std::optional<std::pair<TStatus::TStatusCode, TString>> compileError)
: Columns(columns)
, WhereFilter(whereFilter)
, Callback(callback)
, CompileError(compileError)
{}

public:
Expand All @@ -32,33 +34,60 @@ class TFiterFixture : public TBaseFixture {
return WhereFilter;
}

IPureCalcProgramFactory::TSettings GetPurecalcSettings() const override {
TPurecalcCompileSettings GetPurecalcSettings() const override {
return {.EnabledLLVM = false};
}

virtual NActors::TActorId GetFilterId() const override {
NActors::TActorId GetFilterId() const override {
return FilterId;
}

virtual const TVector<ui64>& GetColumnIds() const override {
const TVector<ui64>& GetColumnIds() const override {
return ColumnIds;
}

TMaybe<ui64> GetNextMessageOffset() const override {
return Nothing();
}

void OnFilterStarted() override {
Started = true;
UNIT_ASSERT_C(!CompileError, "Expected compile error: " << CompileError->second);
}

void OnFilteringError(TStatus status) override {
if (CompileError) {
Started = true;
CheckError(status, CompileError->first, CompileError->second);
} else {
UNIT_FAIL("Filtering failed: " << status.ToString());
}
}

void OnFilteredData(ui64 rowId) override {
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
Callback(rowId);
}

protected:
NActors::TActorId FilterId;
TVector<ui64> ColumnIds;
bool Started = false;

private:
const TVector<TSchemaColumn> Columns;
const TString WhereFilter;
const TCallback Callback;
const std::optional<std::pair<TStatus::TStatusCode, TString>> CompileError;
};

public:
virtual void SetUp(NUnitTest::TTestContext& ctx) override {
TBase::SetUp(ctx);

CompileServiceActorId = Runtime.Register(CreatePurecalcCompileService());
}

virtual void TearDown(NUnitTest::TTestContext&) override {
with_lock (Alloc) {
for (auto& holder : Holders) {
Expand All @@ -74,12 +103,14 @@ class TFiterFixture : public TBaseFixture {

public:
virtual TStatus MakeFilter(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback) {
FilterHandler = MakeIntrusive<TFilterConsumer>(columns, whereFilter, callback);
FilterHandler = MakeIntrusive<TFilterConsumer>(columns, whereFilter, callback, CompileError);

auto filterStatus = CreatePurecalcFilter(FilterHandler, {.PureCalcProgramFactory = PureCalcProgramFactory});
auto filterStatus = CreatePurecalcFilter(FilterHandler);
if (filterStatus.IsSuccess()) {
Filter = std::move(filterStatus.GetValue());
CompileFilter();
}

return filterStatus;
}

Expand Down Expand Up @@ -118,10 +149,29 @@ class TFiterFixture : public TBaseFixture {
});
}

private:
void CompileFilter() {
const auto edgeActor = Runtime.AllocateEdgeActor();
Runtime.Send(CompileServiceActorId, edgeActor, Filter->GetCompileRequest().release());
auto response = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvPurecalcCompileResponse>(edgeActor, TDuration::Seconds(5));

UNIT_ASSERT_C(response, "Failed to get compile response");
if (!CompileError) {
UNIT_ASSERT_C(response->Get()->ProgramHolder, "Failed to compile program, error: " << response->Get()->Issues.ToOneLineString());
Filter->OnCompileResponse(std::move(response));
FilterHandler->OnFilterStarted();
} else {
CheckError(TStatus(response->Get()->Status, response->Get()->Issues), CompileError->first, CompileError->second);
}
}

public:
NActors::TActorId CompileServiceActorId;
TFilterConsumer::TPtr FilterHandler;
IPurecalcFilter::TPtr Filter;
TList<TVector<NYql::NUdf::TUnboxedValue>> Holders;

std::optional<std::pair<TStatus::TStatusCode, TString>> CompileError;
};

class TFilterSetFixture : public TFiterFixture {
Expand All @@ -131,20 +181,27 @@ class TFilterSetFixture : public TFiterFixture {
class TFilterSetConsumer : public TFilterConsumer {
public:
using TBase = TFilterConsumer;
using TPtr = TIntrusivePtr<TFilterSetConsumer>;

public:
TFilterSetConsumer(NActors::TActorId filterId, const TVector<ui64>& columnIds, const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback)
: TBase(columns, whereFilter, callback)
TFilterSetConsumer(NActors::TActorId filterId, const TVector<ui64>& columnIds, const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, std::optional<std::pair<TStatus::TStatusCode, TString>> compileError)
: TBase(columns, whereFilter, callback, compileError)
{
FilterId = filterId;
ColumnIds = columnIds;
}

bool IsStarted() const {
return Started;
}
};

public:
void SetUp(NUnitTest::TTestContext& ctx) override {
TBase::SetUp(ctx);
FiltersSet = CreateTopicFiltersSet({.PureCalcConfig = {.PureCalcProgramFactory = PureCalcProgramFactory}});

CompileNotifier = Runtime.AllocateEdgeActor();
FiltersSet = CreateTopicFiltersSet(CompileNotifier, {.CompileServiceId = CompileServiceActorId}, MakeIntrusive<NMonitoring::TDynamicCounters>());
}

void TearDown(NUnitTest::TTestContext& ctx) override {
Expand All @@ -167,17 +224,31 @@ class TFilterSetFixture : public TFiterFixture {
}
FilterIds.emplace_back(FilterIds.size(), 0, 0, 0);

auto filterSetHandler = MakeIntrusive<TFilterSetConsumer>(FilterIds.back(), columnIds, columns, whereFilter, callback);
return FiltersSet->AddFilter(filterSetHandler);
auto filterSetHandler = MakeIntrusive<TFilterSetConsumer>(FilterIds.back(), columnIds, columns, whereFilter, callback, CompileError);
if (auto status = FiltersSet->AddFilter(filterSetHandler)) {
return status;
}

if (!filterSetHandler->IsStarted()) {
// Wait filter compilation
auto response = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvPurecalcCompileResponse>(CompileNotifier, TDuration::Seconds(5));
UNIT_ASSERT_C(response, "Compilation is not performed for filter: " << whereFilter);
FiltersSet->OnCompileResponse(std::move(response));
}

return TStatus();
}

void FilterData(const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows = 0) {
FiltersSet->FilterData(columnIndex, values, numberRows ? numberRows : values.front()->size());
numberRows = numberRows ? numberRows : values.front()->size();
FiltersSet->FilterData(columnIndex, TVector<ui64>(numberRows, std::numeric_limits<ui64>::max()), values, numberRows);
}

public:
TVector<NActors::TActorId> FilterIds;
std::unordered_map<TString, ui64> ColumnIndex;

NActors::TActorId CompileNotifier;
ITopicFiltersSet::TPtr FiltersSet;
};

Expand Down Expand Up @@ -270,11 +341,12 @@ Y_UNIT_TEST_SUITE(TestPurecalcFilter) {
}

Y_UNIT_TEST_F(CompilationValidation, TFiterFixture) {
CheckError(MakeFilter(
CompileError = {TStatus::EId::INTERNAL_ERROR, "Failed to compile purecalc program subissue: { <main>: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: { <main>: Error: Final yql:"};
MakeFilter(
{{"a1", "[DataType; String]"}},
"where a2 ... 50",
[&](ui64 offset) {}
), TStatus::EId::INTERNAL_ERROR, "Failed to compile predicate 'where a2 ... 50' for purecalc filter subissue: { <main>: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: { <main>: Error: Final yql:");
);
}
}

Expand Down Expand Up @@ -332,18 +404,19 @@ Y_UNIT_TEST_SUITE(TestFilterSet) {
));

CheckError(
FiltersSet->AddFilter(MakeIntrusive<TFilterSetConsumer>(FilterIds.back(), TVector<ui64>(), TVector<TSchemaColumn>(), TString(), [&](ui64 offset) {})),
FiltersSet->AddFilter(MakeIntrusive<TFilterSetConsumer>(FilterIds.back(), TVector<ui64>(), TVector<TSchemaColumn>(), TString(), [&](ui64 offset) {}, CompileError)),
TStatus::EId::INTERNAL_ERROR,
"Failed to create new filter, filter with id [0:0:0] already exists"
);
}

Y_UNIT_TEST_F(CompilationValidation, TFilterSetFixture) {
CheckError(MakeFilter(
CompileError = {TStatus::EId::INTERNAL_ERROR, "Filed to compile purecalc program subissue: { <main>: Error: Failed to compile purecalc program subissue: { <main>: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: { <main>: Error: Final yql:"};
MakeFilter(
{{"a1", "[DataType; String]"}},
"where a2 ... 50",
[&](ui64 offset) {}
), TStatus::EId::INTERNAL_ERROR, "Failed to compile predicate 'where a2 ... 50' for purecalc filter subissue: { <main>: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: { <main>: Error: Final yql:");
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/format_handler/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ UNITTEST_FOR(ydb/core/fq/libs/row_dispatcher/format_handler)

SRCS(
# format_handler_ut.cpp
# topic_filter_ut.cpp
topic_filter_ut.cpp
topic_parser_ut.cpp
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "compile_service.h"

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>

Expand All @@ -17,13 +18,15 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
public:
TPurecalcCompileService()
: TBase(&TPurecalcCompileService::StateFunc)
, LogPrefix("TPurecalcCompileService: ")
{}

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
)

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie);
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);

TStatus status;
Expand All @@ -39,8 +42,10 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

if (!status.IsSuccess()) {
LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetIssues()), 0, ev->Cookie);
} else {
LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie);
Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
}
}
Expand All @@ -58,6 +63,8 @@ class TPurecalcCompileService : public NActors::TActor<TPurecalcCompileService>
}

private:
const TString LogPrefix;

std::map<TPurecalcCompileSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
)

PEERDIR(
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/row_dispatcher/format_handler/common
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper
Expand Down

0 comments on commit b3aabac

Please sign in to comment.