From 42bb193f8df163167d15e845bb30df408fb9fc9e Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 7 Dec 2024 14:14:32 +0000 Subject: [PATCH] Fixed filter set counters --- .../format_handler/filters/filters_set.cpp | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index c165c0cb3259..0d07712a8796 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -2,8 +2,6 @@ #include -#include - namespace NFq::NRowDispatcher { namespace { @@ -52,7 +50,7 @@ class TTopicFiltersSet : public ITopicFiltersSet { ~TFilterHandler() { if (IsCompiling()) { Self.Counters.InFlightCompileRequests->Dec(); - } else if (PurecalcFilter) { + } else if (FilterStarted) { Self.Counters.ActiveFilters->Dec(); } } @@ -71,16 +69,15 @@ class TTopicFiltersSet : public ITopicFiltersSet { void CompileFilter() { if (!PurecalcFilter) { - Self.Counters.ActiveFilters->Inc(); - Consumer->OnFilterStarted(); + StartFilter(); return; } InFlightCompilationId = Self.FreeCompileId++; + Self.Counters.InFlightCompileRequests->Inc(); Y_ENSURE(Self.InFlightCompilations.emplace(InFlightCompilationId, Consumer->GetFilterId()).second, "Got duplicated compilation event id"); - LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << InFlightCompilationId); - Self.Counters.InFlightCompileRequests->Inc(); + LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << InFlightCompilationId); NActors::TActivationContext::Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId)); } @@ -106,6 +103,18 @@ class TTopicFiltersSet : public ITopicFiltersSet { InFlightCompilationId = 0; } + private: + void StartFilter() { + if (InFlightCompilationId) { + InFlightCompilationId = 0; + Self.Counters.InFlightCompileRequests->Dec(); + } + + FilterStarted = true; + Self.Counters.ActiveFilters->Inc(); + Consumer->OnFilterStarted(); + } + private: TTopicFiltersSet& Self; const IFilteredDataConsumer::TPtr Consumer; @@ -113,6 +122,7 @@ class TTopicFiltersSet : public ITopicFiltersSet { const TString LogPrefix; ui64 InFlightCompilationId = 0; + bool FilterStarted = false; }; public: