Skip to content

Commit

Permalink
Supported unboxed values passing from parser to filters
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Oct 22, 2024
1 parent 6af3402 commit 838079f
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 254 deletions.
102 changes: 55 additions & 47 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
#include <ydb/library/yql/public/udf/udf_version.h>
#include <ydb/library/yql/public/purecalc/purecalc.h>
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
Expand All @@ -23,6 +24,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
.Add(fieldType);
}

NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
return NYT::TNode::CreateList()
.Add("OptionalType")
.Add(CreateTypeNode(fieldType));
}

void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(
NYT::TNode::CreateList()
Expand All @@ -31,18 +38,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
);
}

void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(NYT::TNode::CreateList()
.Add(fieldName)
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
NYT::TNode parsedType;
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");

// TODO: remove this when the re-parsing is removed from pq read actor
if (parsedType == CreateTypeNode("Json")) {
parsedType = CreateTypeNode("String");
} else if (parsedType == CreateOptionalTypeNode("Json")) {
parsedType = CreateOptionalTypeNode("String");
}

node.Add(
NYT::TNode::CreateList()
.Add(fieldName)
.Add(parsedType)
);
}

NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
auto structMembers = NYT::TNode::CreateList();
AddField(structMembers, OffsetFieldName, "Uint64");
for (const auto& col : columns) {
AddOptionalField(structMembers, col, "String");
for (size_t i = 0; i < columns.size(); ++i) {
AddTypedField(structMembers, columns[i], types[i]);
}
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}
Expand All @@ -68,7 +86,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -106,15 +124,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
}
}

void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();

// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
Expand All @@ -126,13 +144,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T

size_t fieldId = 0;
for (const auto& column : values.second) {
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
: NKikimr::NUdf::TUnboxedValuePod();
items[FieldsPositions[fieldId++]] = column->at(rowId);
}

Worker->Push(std::move(result));
}

// Clear cache after on each object because
// values allocated on another allocator and should be released
Cache.Clear();
}
}

Expand Down Expand Up @@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand All @@ -243,13 +263,19 @@ class TJsonFilter::TImpl {
TImpl(const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Sql(GenerateSql(columns, types, whereFilter)) {
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
TCallback callback,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc)
: Sql(GenerateSql(whereFilter)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
auto factory = NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions().SetScopedAlloc(std::move(alloc))
);

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = factory->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns)),
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
NYql::NPureCalc::ETranslationMode::SQL
Expand All @@ -258,7 +284,7 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offsets, values));
}
Expand All @@ -268,29 +294,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter) {
TStringStream str;
str << "$fields = SELECT ";
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
str << OffsetFieldName << ", ";
for (size_t i = 0; i < columnNames.size(); ++i) {
TString columnType = columnTypes[i];
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
if (columnType == "Json") {
columnType = "String";
} else if (columnType == "Optional<Json>") {
columnType = "Optional<String>";
}

if (columnType.StartsWith("Optional")) {
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
} else {
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
}
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
}
str << " FROM Input;\n";
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
str << "\"])))) as data FROM $filtered";
Expand All @@ -300,22 +306,23 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
const TString Sql;
};

TJsonFilter::TJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback)) {
TCallback callback,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, std::move(alloc))) {
}

TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Impl->Push(offsets, values);
}

Expand All @@ -327,8 +334,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback));
TCallback callback,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, std::move(alloc)));
}

} // namespace NFq
11 changes: 6 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>

namespace NFq {

Expand All @@ -14,11 +13,12 @@ class TJsonFilter {
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback);
TCallback callback,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc);

~TJsonFilter();

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
TString GetSql();

private:
Expand All @@ -30,6 +30,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback);
TJsonFilter::TCallback callback,
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc);

} // namespace NFq
Loading

0 comments on commit 838079f

Please sign in to comment.