Skip to content

Commit

Permalink
Fixed historical offsets filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 9, 2024
1 parent abe43f5 commit 624a559
Showing 1 changed file with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,23 @@ class TTopicFiltersSet : public ITopicFiltersSet {

const TInstant startFilter = TInstant::Now();
for (const auto& [_, filterHandler] : Filters) {
const auto consumer = filterHandler.GetConsumer();
if (const auto nextOffset = consumer->GetNextMessageOffset(); !numberRows || (nextOffset && offsets[numberRows - 1] < *nextOffset)) {
LOG_ROW_DISPATCHER_TRACE("Ignore filtering for " << consumer->GetFilterId() << ", historical offset");
continue;
}
if (!filterHandler.IsStarted()) {
LOG_ROW_DISPATCHER_TRACE("Ignore filtering for " << consumer->GetFilterId() << ", client filter is not compiled");
continue;
}

if (filterHandler.GetPurecalcFilter()) {
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
continue;
}

// Clients without filters
const auto consumer = filterHandler.GetConsumer();
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");

for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
consumer->OnFilteredData(rowId);
}
Expand Down Expand Up @@ -217,19 +225,10 @@ class TTopicFiltersSet : public ITopicFiltersSet {

private:
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
const auto consumer = filterHandler.GetConsumer();
if (const auto nextOffset = consumer->GetNextMessageOffset(); !numberRows || (nextOffset && offsets[numberRows - 1] < *nextOffset)) {
LOG_ROW_DISPATCHER_TRACE("Ignore filtering for " << consumer->GetFilterId() << ", historical offset");
return;
}
if (!filterHandler.IsStarted()) {
LOG_ROW_DISPATCHER_TRACE("Ignore filtering for " << consumer->GetFilterId() << ", client filter is not compiled");
return;
}

const auto filter = filterHandler.GetPurecalcFilter();
Y_ENSURE(filter, "Expected initialized filter");

const auto consumer = filterHandler.GetConsumer();
const auto& columnIds = consumer->GetColumnIds();

TVector<const TVector<NYql::NUdf::TUnboxedValue>*> result;
Expand Down

0 comments on commit 624a559

Please sign in to comment.