diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 1afd6db82613..e576fc8415d1 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -142,6 +142,7 @@ class TTopicSession : public TActorBootstrapped { NFq::NRowDispatcherProto::TEvStartSession Settings; NActors::TActorId ReadActorId; std::unique_ptr Filter; // empty if no predicate + ui64 InFlightCompilationId = 0; TQueue> Buffer; ui64 UnreadBytes = 0; bool DataArrivedSent = false; @@ -259,6 +260,7 @@ class TTopicSession : public TActorBootstrapped { void HandleNewEvents(); TInstant GetMinStartingMessageTimestamp() const; void AddDataToClient(TClientsInfo& client, ui64 offset, const TString& json); + void StartClientSession(TClientsInfo& info); std::pair CreateItem(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message); @@ -369,11 +371,6 @@ void TTopicSession::SubscribeOnNextEvent() { return; } - if (!FiltersCompilation.InFlightCompilations.empty()) { - LOG_ROW_DISPATCHER_TRACE("In flight filters compilation: " << FiltersCompilation.InFlightCompilations.size() << ", skip yds event handling"); - return; - } - if (Config.GetMaxSessionUsedMemory() && UnreadBytes > Config.GetMaxSessionUsedMemory()) { LOG_ROW_DISPATCHER_TRACE("Too much used memory (" << UnreadBytes << " bytes), skip subscribing to WaitEvent()"); return; @@ -556,11 +553,6 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { void TTopicSession::HandleNewEvents() { ui64 handledEventsSize = 0; - if (!FiltersCompilation.InFlightCompilations.empty()) { - LOG_ROW_DISPATCHER_TRACE("In flight filters compilation: " << FiltersCompilation.InFlightCompilations.size() << ", skip yds event handling"); - return; - } - for (ui64 i = 0; i < MaxHandledEventsCount; ++i) { if (!ReadSession) { return; @@ -711,6 +703,9 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector< LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << lastOffset); for (auto& [actorId, info] : Clients) { + if (info.InFlightCompilationId) { // filter compilation in flight + continue; + } if (info.NextMessageOffset && lastOffset < info.NextMessageOffset) { // the batch has already been processed continue; } @@ -798,6 +793,28 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) { return false; } +void TTopicSession::StartClientSession(TClientsInfo& info) { + if (ReadSession) { + if (info.Settings.HasOffset() && info.Settings.GetOffset() <= LastMessageOffset) { + LOG_ROW_DISPATCHER_INFO("New client has less offset (" << info.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); + Metrics.RestartSessionByOffsets->Inc(); + ++RestartSessionByOffsets; + info.RestartSessionByOffsetsByQuery->Inc(); + StopReadSession(); + } + } + + if (Parser) { + // Parse remains data before changing parsing schema + DoParsing(true); + } + UpdateParser(); + + if (!ReadSession) { + Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession()); + } +} + void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << ev->Get()->Record.GetSource().GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset()); @@ -810,10 +827,6 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { auto types = GetVector(ev->Get()->Record.GetSource().GetColumnTypes()); try { - if (Parser) { - // Parse remains data before adding new client - DoParsing(true); - } auto queryGroup = Counters->GetSubgroup("queryId", ev->Get()->Record.GetQueryId()); auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(TopicPath)); auto& clientInfo = Clients.emplace( @@ -841,34 +854,23 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { {.EnabledLLVM = source.GetEnabledLLVM()} ); - const ui64 eventId = FiltersCompilation.FreeId++; - Y_ENSURE(FiltersCompilation.InFlightCompilations.emplace(eventId, ev->Sender).second, "Got duplicated compilation event id"); - LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << eventId); + clientInfo.InFlightCompilationId = ++FiltersCompilation.FreeId; + Y_ENSURE(FiltersCompilation.InFlightCompilations.emplace(clientInfo.InFlightCompilationId, ev->Sender).second, "Got duplicated compilation event id"); + LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << clientInfo.InFlightCompilationId); - Send(CompileServiceActorId, clientInfo.Filter->GetCompileRequest().release(), 0, eventId); + Send(CompileServiceActorId, clientInfo.Filter->GetCompileRequest().release(), 0, clientInfo.InFlightCompilationId); Metrics.InFlightCompileRequests->Inc(); } else { ClientsWithoutPredicate.insert(ev->Sender); - } - if (ReadSession) { - if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) { - LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session"); - Metrics.RestartSessionByOffsets->Inc(); - ++RestartSessionByOffsets; - clientInfo.RestartSessionByOffsetsByQuery->Inc(); - StopReadSession(); - } + // In case of in flight compilation topic session will be checked after getting compile response + StartClientSession(clientInfo); } } catch (...) { FatalError("Adding new client failed, got unexpected exception: " + CurrentExceptionMessage(), nullptr, true, Nothing()); } ConsumerName = ev->Get()->Record.GetSource().GetConsumerName(); - UpdateParser(); SendStatisticToRowDispatcher(); - if (!ReadSession) { - Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession()); - } } void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TString& json) { @@ -1117,9 +1119,17 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& e LOG_ROW_DISPATCHER_TRACE("Compile response ignored for id " << ev->Cookie << ", client with id " << clientId << " not found"); return; } - if (const auto& filter = clientIt->second.Filter) { - filter->OnCompileResponse(std::move(ev)); + + auto& clientInfo = clientIt->second; + if (ev->Cookie != clientInfo.InFlightCompilationId) { + LOG_ROW_DISPATCHER_TRACE("Outdated compiler response ignored for id " << ev->Cookie << ", client with id " << clientId << " changed"); + return; } + + Y_ENSURE(clientInfo.Filter, "Unexpected completion response for client without filter"); + clientInfo.Filter->OnCompileResponse(std::move(ev)); + clientInfo.InFlightCompilationId = 0; + StartClientSession(clientInfo); } TString TTopicSession::GetAnyQueryIdByFieldName(const TString& fieldName) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 61e4060c046a..382f32957ff2 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -15,6 +15,8 @@ #include +#include + namespace { using namespace NKikimr; @@ -24,6 +26,38 @@ using namespace NYql::NDq; const ui64 TimeoutBeforeStartSessionSec = 3; const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; +class TPurecalcCompileServiceMock : public NActors::TActor { + using TBase = NActors::TActor; + +public: + TPurecalcCompileServiceMock(TActorId owner) + : TBase(&TPurecalcCompileServiceMock::StateFunc) + , Owner(owner) + , ProgramFactory(NYql::NPureCalc::MakeProgramFactory()) + {} + + STRICT_STFUNC(StateFunc, + hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle); + ) + + void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) { + IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); + + try { + programHolder->CreateProgram(ProgramFactory); + } catch (const NYql::NPureCalc::TCompileError& e) { + UNIT_ASSERT_C(false, "Failed to compile purecalc filter: sql: " << e.GetYql() << ", error: " << e.GetIssues()); + } + + Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); + Send(Owner, new NActors::TEvents::TEvPing()); + } + +private: + const TActorId Owner; + const NYql::NPureCalc::IProgramFactoryPtr ProgramFactory; +}; + class TFixture : public NUnitTest::TBaseFixture { public: TFixture() @@ -60,7 +94,8 @@ class TFixture : public NUnitTest::TBaseFixture { std::make_shared(), nullptr); - const auto compileServiceActorId = Runtime.Register(NRowDispatcher::CreatePurecalcCompileService()); + CompileNotifier = Runtime.AllocateEdgeActor(); + const auto compileServiceActorId = Runtime.Register(new TPurecalcCompileServiceMock(CompileNotifier)); TopicSession = Runtime.Register(NewTopicSession( topicPath, @@ -86,7 +121,7 @@ class TFixture : public NUnitTest::TBaseFixture { void TearDown(NUnitTest::TTestContext& /* context */) override { } - void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe readOffset = Nothing()) { + void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe readOffset = Nothing(), bool expectedError = false) { auto event = new NFq::TEvRowDispatcher::TEvStartSession( source, PartitionId, @@ -95,6 +130,13 @@ class TFixture : public NUnitTest::TBaseFixture { 0, // StartingMessageTimestamp; "QueryId"); Runtime.Send(new IEventHandle(TopicSession, readActorId, event)); + + const auto& predicate = source.GetPredicate(); + if (predicate && !expectedError) { + // Wait predicate compilation + const auto ping = Runtime.GrabEdgeEvent(CompileNotifier); + UNIT_ASSERT_C(ping, "Compilation is not performed for predicate: " << predicate); + } } NYql::NPq::NProto::TDqPqTopicSource BuildSource(TString topic, bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) { @@ -177,6 +219,7 @@ class TFixture : public NUnitTest::TBaseFixture { TActorSystemStub ActorSystemStub; NActors::TActorId TopicSession; NActors::TActorId RowDispatcherActorId; + NActors::TActorId CompileNotifier; NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr"))); std::shared_ptr CredentialsProviderFactory; NActors::TActorId ReadActorId1; @@ -208,7 +251,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}); auto source2 = BuildSource(topicName, false, "OtherConsumer"); - StartSession(ReadActorId3, source2); + StartSession(ReadActorId3, source2, Nothing(), true); ExpectSessionError(ReadActorId3, "Use the same consumer"); StopSession(ReadActorId1, source); @@ -484,7 +527,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto source2 = BuildSource(topicName); source2.AddColumns("field1"); source2.AddColumnTypes("[DataType; String]"); - StartSession(ReadActorId2, source2); + StartSession(ReadActorId2, source2, Nothing(), true); ExpectSessionError(ReadActorId2, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])"); } }