From 7ca34820ef895c1e93f038bdd541b83cd9d597d6 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 7 Dec 2024 19:21:35 +0000 Subject: [PATCH] Fixed topic_session_ut.cpp --- .../format_handler/format_handler.cpp | 14 ++- .../format_handler/ut/common/ut_common.cpp | 40 ++++++- .../format_handler/ut/common/ut_common.h | 8 +- .../fq/libs/row_dispatcher/topic_session.cpp | 6 +- .../row_dispatcher/ut/topic_session_ut.cpp | 103 ++++-------------- 5 files changed, 72 insertions(+), 99 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 3a59f9d5e2fe..7a5565fb1160 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -208,6 +208,12 @@ class TTopicFormatHandler : public NActors::TActor, public } void OnFilteredData(ui64 rowId) override { + const ui64 offset = Self.Offsets->at(rowId); + if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) { + LOG_ROW_DISPATCHER_TRACE("OnFilteredData, skip historical offset: " << offset << ", next message offset: " << *nextOffset); + return; + } + Y_DEFER { // Values allocated on parser allocator and should be released FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue()); @@ -218,8 +224,6 @@ class TTopicFormatHandler : public NActors::TActor, public FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId); } DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size()); - - const ui64 offset = Self.Offsets->at(rowId); FilteredOffsets.emplace_back(offset); const ui64 newPackerSize = DataPacker->PackedSizeEstimate(); @@ -555,7 +559,11 @@ class TTopicFormatHandler : public NActors::TActor, public //// ITopicFormatHandler::TDestroy void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) { - NActors::TActivationContext::ActorSystem()->Send(handler->GetSelfId(), new NActors::TEvents::TEvPoison()); + if (NActors::TlsActivationContext) { + NActors::TActivationContext::ActorSystem()->Send(handler->GetSelfId(), new NActors::TEvents::TEvPoison()); + } else { + // Destroy from not AS thread my be caused only in case AS destruction (so handler will be deleted) + } } ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp index 6de82b89e1a1..23cfad654d0b 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp @@ -55,6 +55,28 @@ void SegmentationFaultHandler(int) { //// TBaseFixture::ICell +class TOptionalCell : public TBaseFixture::ICell { +public: + TOptionalCell(ICell::TPtr value) + : Value(value) + {} + +public: + TString GetType() const override { + return TStringBuilder() << "[OptionalType; " << Value->GetType() << "]"; + } + + void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override { + if (!parsedValue) { + return; + } + Value->Validate(parsedValue.GetOptionalValue()); + } + +private: + const ICell::TPtr Value; +}; + class TStringSell : public TBaseFixture::ICell { public: TStringSell(const TString& value) @@ -97,21 +119,29 @@ class TUint64Sell : public TBaseFixture::ICell { //// TBaseFixture::TRow -TBaseFixture::TRow& TBaseFixture::TRow::AddCell(ICell::TPtr cell) { +TBaseFixture::TRow& TBaseFixture::TRow::AddCell(ICell::TPtr cell, bool optional) { + if (optional) { + cell = MakeIntrusive(cell); + } + Cells.emplace_back(cell); return *this; } -TBaseFixture::TRow& TBaseFixture::TRow::AddString(const TString& value) { - return AddCell(MakeIntrusive(value)); +TBaseFixture::TRow& TBaseFixture::TRow::AddString(const TString& value, bool optional) { + return AddCell(MakeIntrusive(value), optional); } -TBaseFixture::TRow& TBaseFixture::TRow::AddUint64(ui64 value) { - return AddCell(MakeIntrusive(value)); +TBaseFixture::TRow& TBaseFixture::TRow::AddUint64(ui64 value, bool optional) { + return AddCell(MakeIntrusive(value), optional); } //// TBaseFixture::TBatch +TBaseFixture::TBatch::TBatch(std::initializer_list rows) + : Rows(rows) +{} + TBaseFixture::TBatch& TBaseFixture::TBatch::AddRow(TRow row) { Rows.emplace_back(row); return *this; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h index 514093c7207b..df5f5049192f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h @@ -26,9 +26,10 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser { public: TRow() = default; - TRow& AddCell(ICell::TPtr cell); - TRow& AddString(const TString& value); - TRow& AddUint64(ui64 value); + TRow& AddCell(ICell::TPtr cell, bool optional); + + TRow& AddString(const TString& value, bool optional = false); + TRow& AddUint64(ui64 value, bool optional = false); public: TVector Cells; @@ -37,6 +38,7 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser { class TBatch { public: TBatch() = default; + TBatch(std::initializer_list rows); TBatch& AddRow(TRow row); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 56f8af2ac74d..e5f5725b3827 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -147,10 +147,7 @@ class TTopicSession : public TActorBootstrapped { } void AddDataToClient(ui64 offset, ui64 rowSize) override { - if (NextMessageOffset && offset < *NextMessageOffset) { - LOG_ROW_DISPATCHER_TRACE("AddDataToClient " << ReadActorId << " skip historical offset: " << offset << ", next message offset: " << *NextMessageOffset); - return; - } + Y_ENSURE(!NextMessageOffset || offset >= *NextMessageOffset, "Unexpected historical offset"); LOG_ROW_DISPATCHER_TRACE("AddDataToClient to " << ReadActorId << ", offset: " << offset << ", serialized size: " << rowSize); @@ -363,6 +360,7 @@ void TTopicSession::Bootstrap() { void TTopicSession::PassAway() { LOG_ROW_DISPATCHER_DEBUG("PassAway"); StopReadSession(); + FormatHandlers.clear(); TBase::PassAway(); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 80e757acf41f..5892d8e13f1f 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -26,68 +26,10 @@ using namespace NYql::NDq; const ui64 TimeoutBeforeStartSessionSec = 3; const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; -class TPurecalcCompileServiceMock : public NActors::TActor { - using TBase = NActors::TActor; - -public: - TPurecalcCompileServiceMock(TActorId owner) - : TBase(&TPurecalcCompileServiceMock::StateFunc) - , Owner(owner) - , ProgramFactory(NYql::NPureCalc::MakeProgramFactory()) - {} - - STRICT_STFUNC(StateFunc, - hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); - ) - - void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { - IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); - - try { - programHolder->CreateProgram(ProgramFactory); - } catch (const NYql::NPureCalc::TCompileError& e) { - UNIT_ASSERT_C(false, "Failed to compile purecalc filter: sql: " << e.GetYql() << ", error: " << e.GetIssues()); - } - - Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); - Send(Owner, new NActors::TEvents::TEvPing()); - } - -private: - const TActorId Owner; - const NYql::NPureCalc::IProgramFactoryPtr ProgramFactory; -}; - class TFixture : public NTests::TBaseFixture { public: using TBase = NTests::TBaseFixture; - struct TExpectedCell { - enum class EMessageType { - String, - Int - }; - - EMessageType Type; - - ui64 ExperctedInt = 0; - TString ExperctedString = ""; - }; - - struct TExpectedRow { - TVector ExpectedCells; - - TExpectedRow& AddString(const TString& expectedString) { - ExpectedCells.push_back(TExpectedCell{.Type = TExpectedCell::EMessageType::String, .ExperctedString = expectedString}); - return *this; - } - - TExpectedRow& AddInt(ui64 expectedInt) { - ExpectedCells.push_back(TExpectedCell{.Type = TExpectedCell::EMessageType::Int, .ExperctedInt = expectedInt}); - return *this; - } - }; - public: void SetUp(NUnitTest::TTestContext& ctx) override { TBase::SetUp(ctx); @@ -115,7 +57,7 @@ class TFixture : public NTests::TBaseFixture { nullptr); CompileNotifier = Runtime.AllocateEdgeActor(); - const auto compileServiceActorId = Runtime.Register(new TPurecalcCompileServiceMock(CompileNotifier)); + const auto compileServiceActorId = Runtime.Register(CreatePurecalcCompileServiceMock(CompileNotifier)); TopicSession = Runtime.Register(NewTopicSession( topicPath, @@ -181,30 +123,17 @@ class TFixture : public NTests::TBaseFixture { Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release())); } - void ExpectMessageBatch(NActors::TActorId readActorId, const std::vector& expected) { + void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected) { Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch())); auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); - UNIT_ASSERT_VALUES_EQUAL(expected.size(), eventHolder->Get()->Record.MessagesSize()); - for (size_t i = 0; i < expected.size(); ++i) { - NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(i); - - UNIT_ASSERT_VALUES_EQUAL_C(message.ColumnsPayloadSize(), expected[i].ExpectedCells.size(), "Message id: " << i); - for (size_t j = 0; const auto& expectedCell : expected[i].ExpectedCells) { - const auto& serializedColumn = eventHolder->Get()->GetPayload(message.GetColumnsPayload().Get(j++)); - switch (expectedCell.Type) { - case TExpectedCell::EMessageType::String: - CheckStringColumn(serializedColumn, expectedCell.ExperctedString); - break; - - case TExpectedCell::EMessageType::Int: - CheckIntColumn(serializedColumn, expectedCell.ExperctedInt); - break; - } - } - } + UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize()); + + NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0); + UNIT_ASSERT_VALUES_EQUAL(message.OffsetsSize(), expected.Rows.size()); + CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected); } void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatusCode statusCode, TString message = "") { @@ -233,7 +162,13 @@ class TFixture : public NTests::TBaseFixture { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); - return eventHolder->Get()->Record.MessagesSize(); + + size_t numberMessages = 0; + for (const auto& message : eventHolder->Get()->Record.GetMessages()) { + numberMessages += message.OffsetsSize(); + } + + return numberMessages; } void ExpectStatisticToReadActor(TSet readActorIds) { @@ -246,8 +181,8 @@ class TFixture : public NTests::TBaseFixture { } } - static TExpectedRow JsonMessage(ui64 index) { - return TExpectedRow().AddInt(100 * index).AddString(TStringBuilder() << "value" << index); + static TRow JsonMessage(ui64 index) { + return TRow().AddUint64(100 * index).AddString(TStringBuilder() << "value" << index); } public: @@ -371,10 +306,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { StartSession(ReadActorId2, source, 2); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); - std::vector expected1 = { JsonMessage(2), JsonMessage(3) }; + TBatch expected1 = { JsonMessage(2), JsonMessage(3) }; ExpectMessageBatch(ReadActorId1, expected1); - std::vector expected2 = { JsonMessage(3) }; + TBatch expected2 = { JsonMessage(3) }; ExpectMessageBatch(ReadActorId2, expected2); const std::vector data2 = { Json4 }; @@ -569,7 +504,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json1 = "{\"dt\":100,\"field1\":\"str\",\"value\":\"value1\"}"; PQWrite({ json1 }, topicName); ExpectNewDataArrived({ReadActorId1}); - ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str") }); + ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str", true) }); auto source2 = BuildSource(topicName); source2.AddColumns("field1");