Skip to content

Commit

Permalink
scan optimization for filter applying in case simple chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Dec 10, 2024
1 parent 34cd26c commit c8bfbe5
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 40 deletions.
36 changes: 24 additions & 12 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatc

}

TColumnFilter::TApplyContext& TColumnFilter::TApplyContext::Slice(const ui32 start, const ui32 count) {
AFL_VERIFY(!StartPos && !Count);
StartPos = start;
Count = count;
return *this;
}

bool TColumnFilter::TIterator::Next(const ui32 size) {
Y_ABORT_UNLESS(size);
if (CurrentRemainVolume > size) {
Expand Down Expand Up @@ -311,15 +318,15 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
}

template <class TData>
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
if (!batch || !batch->num_rows()) {
return false;
}
AFL_VERIFY(!!startPos == !!count);
if (!filter.IsEmpty()) {
if (startPos) {
AFL_VERIFY(filter.Size() >= *startPos + *count)("filter_size", filter.Size())("start", *startPos)("count", *count);
AFL_VERIFY(*count == (size_t)batch->num_rows())("count", *count)("batch_size", batch->num_rows());
if (context.HasSlice()) {
AFL_VERIFY(filter.Size() >= *context.GetStartPos() + *context.GetCount())("filter_size", filter.Size())("start", context.GetStartPos())(
"count", context.GetCount());
AFL_VERIFY(*context.GetCount() == (size_t)batch->num_rows())("count", context.GetCount())("batch_size", batch->num_rows());
} else {
AFL_VERIFY(filter.Size() == (size_t)batch->num_rows())("filter_size", filter.Size())("batch_size", batch->num_rows());
}
Expand All @@ -331,20 +338,25 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
if (filter.IsTotalAllowFilter()) {
return true;
}
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
if (context.GetUseSlices() && !context.GetStartPos() && !context.GetCount()) {
batch = NAdapter::TDataBuilderPolicy<TData>::ApplySlicesFilter(batch, filter);
} else {
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(
batch, filter.BuildArrowFilter(batch->num_rows(), context.GetStartPos(), context.GetCount()));
}
return batch->num_rows();
}

bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context) const {
return ApplyImpl(*this, batch, context);
}

void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const {
Expand Down
46 changes: 34 additions & 12 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TColumnFilter {
bool DefaultFilterValue = true;
bool LastValue = true;
ui32 Count = 0;
std::vector<ui32> Filter;
YDB_READONLY_DEF(std::vector<ui32>, Filter);
mutable std::optional<std::vector<bool>> FilterPlain;
mutable std::optional<ui32> FilteredCount;
TColumnFilter(const bool defaultFilterValue)
Expand All @@ -31,6 +31,14 @@ class TColumnFilter {

}

static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}
public:
bool GetStartValue(const bool reverse = false) const {
if (Filter.empty()) {
return DefaultFilterValue;
Expand All @@ -46,14 +54,6 @@ class TColumnFilter {
}
}

static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2);
class TMergerImpl;
void Reset(const ui32 count);
void ResetCaches() const {
FilterPlain.reset();
FilteredCount.reset();
}
public:
void Append(const TColumnFilter& filter);
void Add(const bool value, const ui32 count = 1);
std::optional<ui32> GetFilteredCount() const;
Expand Down Expand Up @@ -199,9 +199,31 @@ class TColumnFilter {
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
class TApplyContext {
private:
YDB_READONLY_DEF(std::optional<ui32>, StartPos);
YDB_READONLY_DEF(std::optional<ui32>, Count);
YDB_ACCESSOR(bool, UseSlices, false);

public:
TApplyContext() = default;
bool HasSlice() const {
return !!StartPos && !!Count;
}

TApplyContext(const ui32 start, const ui32 count)
: StartPos(start)
, Count(count)
{

}

TApplyContext& Slice(const ui32 start, const ui32 count);
};

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;

// Combines filters by 'and' operator (extFilter count is true positions count in self, thought extFitler patch exactly that positions)
Expand Down
39 changes: 39 additions & 0 deletions ydb/core/formats/arrow/common/adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#include "container.h"

#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/arrow_filter.h>

#include <ydb/library/formats/arrow/arrow_helpers.h>
#include <ydb/library/formats/arrow/common/validation.h>
#include <ydb/library/yverify_stream/yverify_stream.h>

Expand Down Expand Up @@ -46,6 +48,22 @@ class TDataBuilderPolicy<arrow::RecordBatch> {
Y_ABORT_UNLESS(res->kind() == arrow::Datum::RECORD_BATCH);
return res->record_batch();
}
[[nodiscard]] static std::shared_ptr<arrow::RecordBatch> ApplySlicesFilter(
const std::shared_ptr<arrow::RecordBatch>& batch, const TColumnFilter& filter) {
bool currentFiltered = filter.GetStartValue();
std::vector<std::shared_ptr<arrow::RecordBatch>> slices;
slices.reserve(filter.GetFilter().size() / 2 + 1);
ui32 currentIndex = 0;
for (auto&& i : filter.GetFilter()) {
if (currentFiltered) {
slices.emplace_back(batch->Slice(currentIndex, i));
}
currentFiltered = !currentFiltered;
currentIndex += i;
}
AFL_VERIFY(currentIndex == batch->num_rows());
return NArrow::ToBatch(TStatusValidator::GetValid(arrow::Table::FromRecordBatches(slices)), true);
}
[[nodiscard]] static std::shared_ptr<arrow::RecordBatch> GetEmptySame(const std::shared_ptr<arrow::RecordBatch>& batch) {
return batch->Slice(0, 0);
}
Expand Down Expand Up @@ -74,6 +92,22 @@ class TDataBuilderPolicy<arrow::Table> {
Y_ABORT_UNLESS(res->kind() == arrow::Datum::TABLE);
return res->table();
}
[[nodiscard]] static std::shared_ptr<arrow::Table> ApplySlicesFilter(
const std::shared_ptr<arrow::Table>& batch, const TColumnFilter& filter) {
bool currentFiltered = filter.GetStartValue();
std::vector<std::shared_ptr<arrow::Table>> slices;
slices.reserve(filter.GetFilter().size() / 2 + 1);
ui32 currentIndex = 0;
for (auto&& i : filter.GetFilter()) {
if (currentFiltered) {
slices.emplace_back(batch->Slice(currentIndex, i));
}
currentFiltered = !currentFiltered;
currentIndex += i;
}
AFL_VERIFY(currentIndex == batch->num_rows());
return TStatusValidator::GetValid(arrow::ConcatenateTables(slices));
}
[[nodiscard]] static std::shared_ptr<arrow::Table> GetEmptySame(const std::shared_ptr<arrow::Table>& batch) {
return batch->Slice(0, 0);
}
Expand All @@ -100,6 +134,11 @@ class TDataBuilderPolicy<TGeneralContainer> {
auto table = batch->BuildTableVerified();
return std::make_shared<TGeneralContainer>(TDataBuilderPolicy<arrow::Table>::ApplyArrowFilter(table, filter));
}
[[nodiscard]] static std::shared_ptr<TGeneralContainer> ApplySlicesFilter(
const std::shared_ptr<TGeneralContainer>& batch, const TColumnFilter& filter) {
auto table = batch->BuildTableVerified();
return std::make_shared<TGeneralContainer>(TDataBuilderPolicy<arrow::Table>::ApplySlicesFilter(table, filter));
}
[[nodiscard]] static std::shared_ptr<TGeneralContainer> GetEmptySame(const std::shared_ptr<TGeneralContainer>& batch) {
return batch->BuildEmptySame();
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
*lastResultPosition = TCursor(keys, 0, SortSchema->field_names());
}
if (SortHeap.Current().GetFilter()) {
SortHeap.Current().GetFilter()->Apply(result, pos.GetPosition() + (include ? 0 : 1), resultSize);
SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(pos.GetPosition() + (include ? 0 : 1), resultSize));
}
} else {
result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize);
Expand All @@ -114,7 +114,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
*lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names());
}
if (SortHeap.Current().GetFilter()) {
SortHeap.Current().GetFilter()->Apply(result, startPos, resultSize);
SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(startPos, resultSize));
}
}
if (!result || !result->num_rows()) {
Expand Down
17 changes: 12 additions & 5 deletions ydb/core/tx/columnshard/counters/scan.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "scan.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>

Expand Down Expand Up @@ -58,11 +59,16 @@ TScanCounters::TScanCounters(const TString& module)

, BlobsReceivedCount(TBase::GetDeriviative("BlobsReceivedCount"))
, BlobsReceivedBytes(TBase::GetDeriviative("BlobsReceivedBytes"))
{
, ProcessedSourceCount(TBase::GetDeriviative("ProcessedSource/Count"))
, ProcessedSourceRawBytes(TBase::GetDeriviative("ProcessedSource/RawBytes"))
, ProcessedSourceRecords(TBase::GetDeriviative("ProcessedSource/Records"))
, ProcessedSourceEmptyCount(TBase::GetDeriviative("ProcessedSource/Empty/Count"))
, HistogramFilteredResultCount(TBase::GetHistogram("ProcessedSource/Filtered/Count", NMonitoring::ExponentialHistogram(20, 2))) {
HistogramIntervalMemoryRequiredOnFail = TBase::GetHistogram("IntervalMemory/RequiredOnFail/Gb", NMonitoring::LinearHistogram(10, 1, 1));
HistogramIntervalMemoryReduceSize = TBase::GetHistogram("IntervalMemory/Reduce/Gb", NMonitoring::ExponentialHistogram(8, 2, 1));
HistogramIntervalMemoryRequiredAfterReduce = TBase::GetHistogram("IntervalMemory/RequiredAfterReduce/Mb", NMonitoring::ExponentialHistogram(10, 2, 64));
/*
HistogramIntervalMemoryRequiredAfterReduce =
TBase::GetHistogram("IntervalMemory/RequiredAfterReduce/Mb", NMonitoring::ExponentialHistogram(10, 2, 64));
/*
{
const std::map<i64, TString> borders = {{0, "0"}, {512LLU * 1024 * 1024, "0.5Gb"}, {1LLU * 1024 * 1024 * 1024, "1Gb"},
{2LLU * 1024 * 1024 * 1024, "2Gb"}, {3LLU * 1024 * 1024 * 1024, "3Gb"},
Expand Down Expand Up @@ -94,7 +100,8 @@ TScanCounters::TScanCounters(const TString& module)
if (i == EStatusFinish::COUNT) {
continue;
}
ScanDurationByStatus[(ui32)i] = TBase::GetHistogram("ScanDuration/" + ::ToString(i) + "/Milliseconds", NMonitoring::ExponentialHistogram(18, 2, 1));
ScanDurationByStatus[(ui32)i] =
TBase::GetHistogram("ScanDuration/" + ::ToString(i) + "/Milliseconds", NMonitoring::ExponentialHistogram(18, 2, 1));
ScansFinishedByStatus[(ui32)i] = TBase::GetDeriviative("ScansFinished/" + ::ToString(i));
AFL_VERIFY(idx == (ui32)i);
++idx;
Expand All @@ -109,4 +116,4 @@ void TScanCounters::FillStats(::NKikimrTableStats::TTableStats& output) const {
output.SetRangeReads(ScansFinishedByStatus[(ui32)EStatusFinish::Success]->Val());
}

}
} // namespace NKikimr::NColumnShard
16 changes: 16 additions & 0 deletions ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,24 @@ class TScanCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr BlobsReceivedCount;
NMonitoring::TDynamicCounters::TCounterPtr BlobsReceivedBytes;

NMonitoring::TDynamicCounters::TCounterPtr ProcessedSourceCount;
NMonitoring::TDynamicCounters::TCounterPtr ProcessedSourceRawBytes;
NMonitoring::TDynamicCounters::TCounterPtr ProcessedSourceRecords;
NMonitoring::TDynamicCounters::TCounterPtr ProcessedSourceEmptyCount;
NMonitoring::THistogramPtr HistogramFilteredResultCount;

TScanCounters(const TString& module = "Scan");

void OnSourceFinished(const ui32 recordsCount, const ui64 rawBytes, const ui32 filteredRecordsCount) const {
ProcessedSourceCount->Add(1);
ProcessedSourceRawBytes->Add(rawBytes);
ProcessedSourceRecords->Add(recordsCount);
HistogramFilteredResultCount->Collect(filteredRecordsCount);
if (!filteredRecordsCount) {
ProcessedSourceEmptyCount->Add(1);
}
}

void OnOptimizedIntervalMemoryFailed(const ui64 memoryRequired) const {
HistogramIntervalMemoryRequiredOnFail->Collect(memoryRequired / (1024.0 * 1024.0 * 1024.0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ class TFetchedData {
}
}

void AddFilter(const NArrow::TColumnFilter& filter) {
void AddFilter(const NArrow::TColumnFilter& filter, const bool useSlices = false) {
if (UseFilter && Table) {
AFL_VERIFY(filter.Apply(Table));
AFL_VERIFY(filter.Apply(Table, NArrow::TColumnFilter::TApplyContext().SetUseSlices(useSlices)));
}
if (!Filter) {
Filter = std::make_shared<NArrow::TColumnFilter>(filter);
Expand Down Expand Up @@ -176,7 +176,7 @@ class TFetchedData {
DataAdded = true;
auto tableLocal = table;
if (Filter && UseFilter) {
AFL_VERIFY(Filter->Apply(tableLocal));
AFL_VERIFY(Filter->Apply(tableLocal, NArrow::TColumnFilter::TApplyContext().SetUseSlices(Filter->GetFilter().size() < 10)));
}
if (!Table) {
Table = std::make_shared<NArrow::TGeneralContainer>(tableLocal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ TConclusion<bool> TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDa
TConclusion<bool> TPredicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
auto filter =
source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(source->GetStageData().GetTable()->BuildTableVerified());
source->MutableStageData().AddFilter(filter);
source->MutableStageData().AddFilter(filter, true);
return true;
}

Expand Down Expand Up @@ -137,6 +137,7 @@ TConclusion<bool> TBuildFakeSpec::DoExecuteInplace(const std::shared_ptr<IDataSo
}
source->MutableStageData().AddBatch(
std::make_shared<NArrow::TGeneralContainer>(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns)));
source->SetUsedRawBytes(0);
source->Finalize({});
return true;
}
Expand Down Expand Up @@ -371,7 +372,7 @@ TConclusion<bool> TBuildResultStep::DoExecuteInplace(const std::shared_ptr<IData
resultBatch = source->GetStageResult().GetBatch()->BuildTableVerified(contextTableConstruct);
AFL_VERIFY((ui32)resultBatch->num_columns() == context->GetProgramInputColumns()->GetColumnNamesVector().size());
if (auto filter = source->GetStageResult().GetNotAppliedFilter()) {
filter->Apply(resultBatch, StartIndex, RecordsCount);
filter->Apply(resultBatch, NArrow::TColumnFilter::TApplyContext(StartIndex, RecordsCount));
}
if (resultBatch && resultBatch->num_rows()) {
NArrow::TStatusValidator::Validate(context->GetReadMetadata()->GetProgram().ApplyProgram(resultBatch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@

namespace NKikimr::NOlap::NReader::NSimple {

void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& table, const ui32 startIndex,
void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::shared_ptr<arrow::Table>&& tableExt, const ui32 startIndex,
const ui32 recordsCount, TPlainReadData& reader) {
source->MutableResultRecordsCount() += table ? table->num_rows() : 0;
source->MutableStageResult().SetResultChunk(std::move(table), startIndex, recordsCount);
if ((!table || !table->num_rows()) && Context->GetCommonContext()->GetReadMetadata()->Limit && InFlightLimit < MaxInFlight) {

source->MutableResultRecordsCount() += tableExt ? tableExt->num_rows() : 0;
if (!tableExt || !tableExt->num_rows()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("empty_source", source->DebugJson().GetStringRobust());
}
Context->GetCommonContext()->GetCounters().OnSourceFinished(
source->GetRecordsCount(), source->GetUsedRawBytes(), tableExt ? tableExt->num_rows() : 0);

if ((!tableExt || !tableExt->num_rows()) && Context->GetCommonContext()->GetReadMetadata()->Limit && InFlightLimit < MaxInFlight) {
InFlightLimit = 2 * InFlightLimit;
}
source->MutableStageResult().SetResultChunk(std::move(tableExt), startIndex, recordsCount);
while (FetchingSources.size()) {
auto frontSource = FetchingSources.front();
if (!frontSource->HasStageResult()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
AFL_VERIFY(!result.HasErrors());
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
Source->MutableStageData().SetPortionAccessor(std::move(result.ExtractPortionsVector().front()));
Source->InitUsedRawBytes();
AFL_VERIFY(Step.Next());
auto task = std::make_shared<TStepAction>(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId());
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
Expand Down
Loading

0 comments on commit c8bfbe5

Please sign in to comment.