Skip to content

Commit

Permalink
Fixed filter set counters
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent d7827bc commit 42bb193
Showing 1 changed file with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

#include <ydb/core/fq/libs/actors/logging/log.h>

#include <util/string/builder.h>

namespace NFq::NRowDispatcher {

namespace {
Expand Down Expand Up @@ -52,7 +50,7 @@ class TTopicFiltersSet : public ITopicFiltersSet {
~TFilterHandler() {
if (IsCompiling()) {
Self.Counters.InFlightCompileRequests->Dec();
} else if (PurecalcFilter) {
} else if (FilterStarted) {
Self.Counters.ActiveFilters->Dec();
}
}
Expand All @@ -71,16 +69,15 @@ class TTopicFiltersSet : public ITopicFiltersSet {

void CompileFilter() {
if (!PurecalcFilter) {
Self.Counters.ActiveFilters->Inc();
Consumer->OnFilterStarted();
StartFilter();
return;
}

InFlightCompilationId = Self.FreeCompileId++;
Self.Counters.InFlightCompileRequests->Inc();
Y_ENSURE(Self.InFlightCompilations.emplace(InFlightCompilationId, Consumer->GetFilterId()).second, "Got duplicated compilation event id");
LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << InFlightCompilationId);

Self.Counters.InFlightCompileRequests->Inc();
LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << InFlightCompilationId);
NActors::TActivationContext::Send(new NActors::IEventHandle(Self.Config.CompileServiceId, Self.Owner, PurecalcFilter->GetCompileRequest().release(), 0, InFlightCompilationId));
}

Expand All @@ -106,13 +103,26 @@ class TTopicFiltersSet : public ITopicFiltersSet {
InFlightCompilationId = 0;
}

private:
void StartFilter() {
if (InFlightCompilationId) {
InFlightCompilationId = 0;
Self.Counters.InFlightCompileRequests->Dec();
}

FilterStarted = true;
Self.Counters.ActiveFilters->Inc();
Consumer->OnFilterStarted();
}

private:
TTopicFiltersSet& Self;
const IFilteredDataConsumer::TPtr Consumer;
const IPurecalcFilter::TPtr PurecalcFilter;
const TString LogPrefix;

ui64 InFlightCompilationId = 0;
bool FilterStarted = false;
};

public:
Expand Down

0 comments on commit 42bb193

Please sign in to comment.