Skip to content

Commit

Permalink
Passed unboxed values from filter to read actor
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 4, 2024
1 parent d6a279e commit ba64816
Show file tree
Hide file tree
Showing 58 changed files with 4,281 additions and 2,243 deletions.
8 changes: 6 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#pragma once

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>

#include <util/generic/ptr.h>

namespace NFq::NRowDispatcher {

struct IActorFactory : public TThrRefBase {
Expand Down
106 changes: 106 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include "common.h"

#include <util/string/builder.h>

#include <yql/essentials/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {

namespace {

class TPureCalcProgramFactory : public IPureCalcProgramFactory {
public:
TPureCalcProgramFactory() {
CreateFactory({.EnabledLLVM = false});
CreateFactory({.EnabledLLVM = true});
}

NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
const auto it = ProgramFactories.find(settings);
Y_ENSURE(it != ProgramFactories.end());
return it->second;
}

TGuard<TMutex> LockFactory() const override {
return Guard(FactoryMutex);
}

private:
void CreateFactory(const TSettings& settings) {
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions()
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
)});
}

private:
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
TMutex FactoryMutex;
};

} // anonymous namespace

//// TStatus

TStatus::TStatus()
: Status(EId::SUCCESS)
{}

TStatus::TStatus(TStatusCode status, NYql::TIssues issues)
: Status(status)
, Issues(std::move(issues))
{}

TStatus::TStatus(TStatusCode status, TString message)
: Status(status)
, Issues({NYql::TIssue(std::move(message))})
{}

bool TStatus::IsSuccess() const {
return Status == EId::SUCCESS;
}

TStatus::operator bool() const {
return !IsSuccess();
}

TStatus::TStatusCode TStatus::GetStatus() const {
return Status;
}

const NYql::TIssues& TStatus::GetIssues() const {
return Issues;
}

TString TStatus::ToString() const {
return TStringBuilder() << "Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(Status) << ", Issues: " << Issues.ToOneLineString();
}

TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
for (const auto& childIssue : Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
}
Issues = {std::move(issue)};
return *this;
}

TStatus& TStatus::AddParentIssue(TString message) {
return AddParentIssue(NYql::TIssue(std::move(message)));
}

TStatus& TStatus::AddIssue(NYql::TIssue issue) {
Issues.AddIssue(std::move(issue));
return *this;
}

TStatus& TStatus::AddIssue(TString message) {
return AddIssue(NYql::TIssue{std::move(message)});
}

//// IPureCalcProgramFactory

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
return MakeIntrusive<TPureCalcProgramFactory>();
}

} // namespace NFq::NRowDispatcher
100 changes: 100 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#pragma once

#include <util/system/mutex.h>

#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>

#include <yql/essentials/public/issue/yql_issue.h>
#include <yql/essentials/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

struct TFormatHandlerException : public yexception {
using yexception::yexception;
};

class TStatus {
public:
using EId = NYql::NDqProto::StatusIds;
using TStatusCode = EId::StatusCode;

public:
TStatus();
explicit TStatus(TStatusCode status, NYql::TIssues issues = {});
TStatus(TStatusCode status, TString message);

virtual bool IsSuccess() const;
operator bool() const; // Equal to !IsSuccess

TStatusCode GetStatus() const;
const NYql::TIssues& GetIssues() const;

TString ToString() const;

TStatus& AddParentIssue(NYql::TIssue issue);
TStatus& AddParentIssue(TString message);

TStatus& AddIssue(NYql::TIssue issue);
TStatus& AddIssue(TString message);

protected:
TStatusCode Status;
NYql::TIssues Issues;
};

template <typename TValue>
class TValueStatus : public TStatus {
using TBase = TStatus;

public:
TValueStatus(TValue value)
: TBase(EId::SUCCESS)
, Value(std::move(value))
{}

TValueStatus(TStatus status)
: TBase(std::move(status))
{}

bool IsSuccess() const override {
return TBase::IsSuccess() && Value;
}

TValue& GetValue() {
if (Y_UNLIKELY(!Value)) {
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
}
return *Value;
}

private:
std::optional<TValue> Value;
};

struct TSchemaColumn {
TString Name;
TString TypeYson;

bool operator==(const TSchemaColumn& other) const = default;
};

class IPureCalcProgramFactory : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;

struct TSettings {
bool EnabledLLVM = false;

std::strong_ordering operator<=>(const TSettings& other) const = default;
};

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;

// Before creating purecalc program factory should be locked
virtual TGuard<TMutex> LockFactory() const = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

} // namespace NFq::NRowDispatcher
18 changes: 18 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
LIBRARY()

SRCS(
common.cpp
)

PEERDIR(
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper

ydb/library/yql/dq/actors
ydb/library/yql/dq/actors/protos

yql/essentials/public/issue
)

YQL_LAST_ABI_VERSION()

END()
124 changes: 124 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#include "filters_set.h"

#include <util/string/builder.h>

namespace NFq::NRowDispatcher {

namespace {

class TTopicFiltersSet : public ITopicFiltersSet {
class TFilterHandler {
public:
TFilterHandler(IFilteredDataConsumer::TPtr consumer, IPurecalcFilter::TPtr purecalcFilter)
: Consumer(std::move(consumer))
, PurecalcFilter(std::move(purecalcFilter))
{}

IFilteredDataConsumer::TPtr GetConsumer() const {
return Consumer;
}

IPurecalcFilter::TPtr GetPurecalcFilter() const {
return PurecalcFilter;
}

private:
IFilteredDataConsumer::TPtr Consumer;
IPurecalcFilter::TPtr PurecalcFilter;
};

public:
explicit TTopicFiltersSet(const TTopicFiltersSetConfig& config)
: Config(config)
{}

public:
void FilterData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) override {
for (const auto& [_, filterHandler] : Filters) {
if (filterHandler.GetPurecalcFilter()) {
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
continue;
}

// Clients without filters
auto consumer = filterHandler.GetConsumer();
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
consumer->OnFilteredData(rowId);
}
}
}

TStatus AddFilter(IFilteredDataConsumer::TPtr filter) override {
IPurecalcFilter::TPtr purecalcFilter;
if (filter->GetWhereFilter()) {
auto filterStatus = CreatePurecalcFilter(filter, Config.PureCalcConfig);
if (!filterStatus.IsSuccess()) {
return filterStatus;
}
purecalcFilter = std::move(filterStatus.GetValue());
}

if (!Filters.insert({filter->GetFilterId(), TFilterHandler(filter, std::move(purecalcFilter))}).second) {
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new filter, filter with id " << filter->GetFilterId() << " already exists");
}

return TStatus();
}

void RemoveFilter(NActors::TActorId filterId) override {
Filters.erase(filterId);
}

TStatistics GetStatistics() override {
TStatistics statistics;
for (const auto& [filterId, filterHandler] : Filters) {
if (auto filter = filterHandler.GetPurecalcFilter()) {
statistics.FiltersStatistics.insert({filterId, filter->GetStatistics()});
}
}
return statistics;
}

private:
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
if (const auto nextOffset = filterHandler.GetConsumer()->GetNextMessageOffset(); !numberRows || offsets[numberRows - 1] < *nextOffset) {
// Do not filter historical data
return;
}

auto filter = filterHandler.GetPurecalcFilter();
Y_ENSURE(filter, "Expected initialized filter");

const auto& columnIds = filterHandler.GetConsumer()->GetColumnIds();

TVector<const TVector<NYql::NUdf::TUnboxedValue>*> result;
result.reserve(columnIds.size());
for (ui64 columnId : columnIds) {
Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size());
ui64 index = columnIndex[columnId];

if (!values[index]) {
// Column was lost during parsing, skip filter
return;
}

Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size());
result.push_back(values[index]);
}

filter->FilterData(result, numberRows);
}

private:
const TTopicFiltersSetConfig Config;

std::unordered_map<NActors::TActorId, TFilterHandler> Filters;
};

} // anonymous namespace

ITopicFiltersSet::TPtr CreateTopicFiltersSet(const TTopicFiltersSetConfig& config) {
return MakeIntrusive<TTopicFiltersSet>(config);
}

} // namespace NFq::NRowDispatcher
Loading

0 comments on commit ba64816

Please sign in to comment.