Skip to content

Commit

Permalink
Passed unboxed values from filter to read actor
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 25, 2024
1 parent 5b45c74 commit 2a27bbb
Show file tree
Hide file tree
Showing 53 changed files with 3,890 additions and 2,044 deletions.
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "common.h"
#include "format_handler/common/common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
Expand Down
42 changes: 0 additions & 42 deletions ydb/core/fq/libs/row_dispatcher/common.cpp

This file was deleted.

25 changes: 0 additions & 25 deletions ydb/core/fq/libs/row_dispatcher/common.h

This file was deleted.

103 changes: 103 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "common.h"

#include <util/string/builder.h>

#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {

namespace {

class TPureCalcProgramFactory : public IPureCalcProgramFactory {
public:
TPureCalcProgramFactory() {
CreateFactory({.EnabledLLVM = false});
CreateFactory({.EnabledLLVM = true});
}

NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const override {
const auto it = ProgramFactories.find(settings);
Y_ENSURE(it != ProgramFactories.end());
return it->second;
}

TGuard<TMutex> LockFactory() const override {
return Guard(FactoryMutex);
}

private:
void CreateFactory(const TSettings& settings) {
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
NYql::NPureCalc::TProgramFactoryOptions()
.SetLLVMSettings(settings.EnabledLLVM ? "ON" : "OFF")
)});
}

private:
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
TMutex FactoryMutex;
};

} // anonymous namespace

//// TStatus

TStatus::TStatus()
: Status(EId::SUCCESS)
{}

TStatus::TStatus(TStatucCode status, NYql::TIssues issues)
: Status(status)
, Issues(std::move(issues))
{}

TStatus::TStatus(TStatucCode status, TString message)
: Status(status)
, Issues({NYql::TIssue(std::move(message))})
{}

bool TStatus::IsSuccess() const {
return Status == EId::SUCCESS;
}

TStatus::TStatucCode TStatus::GetStatus() const {
return Status;
}

const NYql::TIssues& TStatus::GetIssues() const {
return Issues;
}

TString TStatus::ToString() const {
return TStringBuilder() << "Status: " << NYql::NDq::DqStatusToYdbStatus(Status) << ", Issues: " << Issues.ToOneLineString();
}

TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
for (const auto& childIssue : Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
}
Issues = {std::move(issue)};
return *this;
}

TStatus& TStatus::AddParentIssue(TString message) {
return AddParentIssue(NYql::TIssue(std::move(message)));
}

TStatus& TStatus::AddIssue(NYql::TIssue issue) {
Issues.AddIssue(std::move(issue));
return *this;
}

TStatus& TStatus::AddIssue(TString message) {
return AddIssue(NYql::TIssue{std::move(message)});
}

//// IPureCalcProgramFactory

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory() {
return MakeIntrusive<TPureCalcProgramFactory>();
}

} // namespace NFq::NRowDispatcher
99 changes: 99 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#pragma once

#include <util/system/mutex.h>

#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <yql/essentials/public/issue/yql_issue.h>

namespace NFq::NRowDispatcher {

struct TFormatHandlerException : public yexception {
using yexception::yexception;
};

class TStatus {
public:
using EId = NYql::NDqProto::StatusIds;
using TStatucCode = EId::StatusCode;

public:
TStatus();
explicit TStatus(TStatucCode status, NYql::TIssues issues = {});
TStatus(TStatucCode status, TString message);

virtual bool IsSuccess() const;

TStatucCode GetStatus() const;
const NYql::TIssues& GetIssues() const;

TString ToString() const;

TStatus& AddParentIssue(NYql::TIssue issue);
TStatus& AddParentIssue(TString message);

TStatus& AddIssue(NYql::TIssue issue);
TStatus& AddIssue(TString message);

protected:
TStatucCode Status;
NYql::TIssues Issues;
};

template <typename TValue>
class TValueStatus : public TStatus {
using TBase = TStatus;

public:
TValueStatus(TValue value)
: TBase(EId::SUCCESS)
, Value(std::move(value))
{}

TValueStatus(TStatus status)
: TBase(std::move(status))
{}

bool IsSuccess() const override {
return TBase::IsSuccess() && Value;
}

TValue& GetValue() {
if (Y_UNLIKELY(!Value)) {
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
}
return *Value;
}

private:
std::optional<TValue> Value;
};

struct TSchemaColumn {
TString Name;
TString TypeYson;

bool operator==(const TSchemaColumn& other) const = default;
};

class IPureCalcProgramFactory : public TThrRefBase {
public:
using TPtr = TIntrusivePtr<IPureCalcProgramFactory>;

struct TSettings {
bool EnabledLLVM = false;

std::strong_ordering operator<=>(const TSettings& other) const = default;
};

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;

// Before creating purecalc program factory should be locked
virtual TGuard<TMutex> LockFactory() const = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();

} // namespace NFq::NRowDispatcher
18 changes: 18 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
LIBRARY()

SRCS(
common.cpp
)

PEERDIR(
ydb/core/fq/libs/row_dispatcher/purecalc_no_pg_wrapper

ydb/library/yql/dq/actors
ydb/library/yql/dq/actors/protos

yql/essentials/public/issue
)

YQL_LAST_ABI_VERSION()

END()
Loading

0 comments on commit 2a27bbb

Please sign in to comment.