Skip to content

Commit

Permalink
Moved status to shared library
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 9, 2024
1 parent 624a559 commit f5feee1
Show file tree
Hide file tree
Showing 20 changed files with 290 additions and 328 deletions.
57 changes: 0 additions & 57 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,6 @@

namespace NFq::NRowDispatcher {

//// TStatus

TStatus::TStatus()
: Status(EId::SUCCESS)
{}

TStatus::TStatus(TStatusCode status, NYql::TIssues issues)
: Status(status)
, Issues(std::move(issues))
{}

TStatus::TStatus(TStatusCode status, TString message)
: Status(status)
, Issues({NYql::TIssue(std::move(message))})
{}

bool TStatus::IsSuccess() const {
return Status == EId::SUCCESS;
}

TStatus::operator bool() const {
return !IsSuccess();
}

TStatus::TStatusCode TStatus::GetStatus() const {
return Status;
}

const NYql::TIssues& TStatus::GetIssues() const {
return Issues;
}

TString TStatus::ToString() const {
return TStringBuilder() << "Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(Status) << ", Issues: " << Issues.ToOneLineString();
}

TStatus& TStatus::AddParentIssue(NYql::TIssue issue) {
for (const auto& childIssue : Issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(childIssue));
}
Issues = {std::move(issue)};
return *this;
}

TStatus& TStatus::AddParentIssue(TString message) {
return AddParentIssue(NYql::TIssue(std::move(message)));
}

TStatus& TStatus::AddIssue(NYql::TIssue issue) {
Issues.AddIssue(std::move(issue));
return *this;
}

TStatus& TStatus::AddIssue(TString message) {
return AddIssue(NYql::TIssue{std::move(message)});
}

//// TSchemaColumn

TString TSchemaColumn::ToString() const {
Expand Down
67 changes: 6 additions & 61 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
Original file line number Diff line number Diff line change
@@ -1,72 +1,17 @@
#pragma once

#include <ydb/library/conclusion/generic/result.h>
#include <ydb/library/conclusion/generic/status.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>

#include <yql/essentials/public/issue/yql_issue.h>

namespace NFq::NRowDispatcher {

struct TFormatHandlerException : public yexception {
using yexception::yexception;
};

class TStatus {
public:
using EId = NYql::NDqProto::StatusIds;
using TStatusCode = EId::StatusCode;

public:
TStatus();
explicit TStatus(TStatusCode status, NYql::TIssues issues = {});
TStatus(TStatusCode status, TString message);

virtual bool IsSuccess() const;
operator bool() const; // Equal to !IsSuccess

TStatusCode GetStatus() const;
const NYql::TIssues& GetIssues() const;

TString ToString() const;

TStatus& AddParentIssue(NYql::TIssue issue);
TStatus& AddParentIssue(TString message);

TStatus& AddIssue(NYql::TIssue issue);
TStatus& AddIssue(TString message);

protected:
TStatusCode Status;
NYql::TIssues Issues;
};
using EStatusId = NYql::NDqProto::StatusIds;
using TStatusCode = EStatusId::StatusCode;
using TStatus = NKikimr::TConclusionStatusIssueImpl<TStatusCode, EStatusId::SUCCESS, EStatusId::INTERNAL_ERROR>;

template <typename TValue>
class TValueStatus : public TStatus {
using TBase = TStatus;

public:
TValueStatus(TValue value)
: TBase(EId::SUCCESS)
, Value(std::move(value))
{}

TValueStatus(TStatus status)
: TBase(std::move(status))
{}

bool IsSuccess() const override {
return TBase::IsSuccess() && Value;
}

TValue& GetValue() {
if (Y_UNLIKELY(!Value)) {
throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString();
}
return *Value;
}

private:
std::optional<TValue> Value;
};
using TValueStatus = NKikimr::TConclusionImpl<TStatus, TValue>;

struct TSchemaColumn {
TString Name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
)

PEERDIR(
ydb/library/conclusion
ydb/library/yql/dq/actors/protos

yql/essentials/public/issue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class TTopicFiltersSet : public ITopicFiltersSet {
Self.Counters.InFlightCompileRequests->Dec();

if (!ev->Get()->ProgramHolder) {
auto status = TStatus(ev->Get()->Status, std::move(ev->Get()->Issues));
LOG_ROW_DISPATCHER_ERROR("Filter compilation error: " << status.ToString());
auto status = TStatus::Fail(ev->Get()->Status, std::move(ev->Get()->Issues));
LOG_ROW_DISPATCHER_ERROR("Filter compilation error: " << status.GetErrorMessage());

Self.Counters.CompileErrors->Inc();
Consumer->OnFilteringError(status.AddParentIssue("Failed to compile client filter"));
Expand Down Expand Up @@ -195,19 +195,19 @@ class TTopicFiltersSet : public ITopicFiltersSet {
IPurecalcFilter::TPtr purecalcFilter;
if (filter->GetWhereFilter()) {
auto filterStatus = CreatePurecalcFilter(filter);
if (!filterStatus.IsSuccess()) {
if (filterStatus.IsFail()) {
return filterStatus;
}
purecalcFilter = std::move(filterStatus.GetValue());
purecalcFilter = filterStatus.DetachResult();
}

const auto [it, inserted] = Filters.insert({filter->GetFilterId(), TFilterHandler(*this, filter, std::move(purecalcFilter))});
if (!inserted) {
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new filter, filter with id " << filter->GetFilterId() << " already exists");
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new filter, filter with id " << filter->GetFilterId() << " already exists");
}

it->second.CompileFilter();
return TStatus();
return TStatus::Success();
}

void RemoveFilter(NActors::TActorId filterId) override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ TValueStatus<IPurecalcFilter::TPtr> CreatePurecalcFilter(IPurecalcFilterConsumer
try {
return IPurecalcFilter::TPtr(MakeIntrusive<TPurecalcFilter>(consumer));
} catch (...) {
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create purecalc filter with predicate '" << consumer->GetWhereFilter() << "', got unexpected exception: " << CurrentExceptionMessage());
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create purecalc filter with predicate '" << consumer->GetWhereFilter() << "', got unexpected exception: " << CurrentExceptionMessage());
}
}

Expand Down
46 changes: 23 additions & 23 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnParsingError(TStatus status) override {
LOG_ROW_DISPATCHER_ERROR("Got parsing error: " << status.ToString());
LOG_ROW_DISPATCHER_ERROR("Got parsing error: " << status.GetErrorString());
Self.FatalError(status);
}

Expand All @@ -73,7 +73,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
for (size_t i = 0; i < ParerSchema.size(); ++i) {
auto columnStatus = Self.Parser->GetParsedColumn(i);
if (Y_LIKELY(columnStatus.IsSuccess())) {
Self.ParsedData[i] = columnStatus.GetValue();
Self.ParsedData[i] = columnStatus.DetachResult();
} else {
OnColumnError(i, columnStatus);
}
Expand All @@ -86,7 +86,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
private:
void OnColumnError(ui64 columnIndex, TStatus status) {
const auto& column = ParerSchema[columnIndex];
LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.ToString());
LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.GetErrorString());

const auto columnIt = Self.ColumnsDesc.find(column.Name);
if (columnIt == Self.ColumnsDesc.end()) {
Expand Down Expand Up @@ -132,14 +132,14 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

TStatus SetupColumns() {
if (Columns.empty()) {
return TStatus(TStatus::EId::INTERNAL_ERROR, "Client should have at least one column in schema");
return TStatus::Fail(EStatusId::INTERNAL_ERROR, "Client should have at least one column in schema");
}

for (const auto& column : Columns) {
const auto it = Self.ColumnsDesc.find(column.Name);
if (it != Self.ColumnsDesc.end()) {
if (it->second.TypeYson != column.TypeYson) {
return TStatus(TStatus::EId::SCHEME_ERROR, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << column.Name << "` is " << it->second.TypeYson << " (requested type is " << column.TypeYson <<")");
return TStatus::Fail(EStatusId::SCHEME_ERROR, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << column.Name << "` is " << it->second.TypeYson << " (requested type is " << column.TypeYson <<")");
}

it->second.Clients.emplace(Client->GetClientId());
Expand Down Expand Up @@ -169,7 +169,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnClientError(TStatus status) {
LOG_ROW_DISPATCHER_WARN("OnClientError, " << status.ToString());
LOG_ROW_DISPATCHER_WARN("OnClientError, " << status.GetErrorString());
Client->OnClientError(std::move(status));
}

Expand Down Expand Up @@ -242,17 +242,17 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
columnTypes.reserve(Columns.size());
for (const auto& column : Columns) {
auto status = Self.ParseTypeYson(column.TypeYson);
if (!status.IsSuccess()) {
if (status.IsFail()) {
return status;
}
columnTypes.emplace_back(status.GetValue());
columnTypes.emplace_back(status.DetachResult());
}

with_lock(Self.Alloc) {
const auto rowType = Self.ProgramBuilder->NewMultiType(columnTypes);
DataPacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(rowType);
}
return TStatus();
return TStatus::Success();
}

void FinishPacking() {
Expand Down Expand Up @@ -334,7 +334,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

void HandleException(const std::exception& error) {
LOG_ROW_DISPATCHER_ERROR("Got unexpected exception: " << error.what());
FatalError(TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Format handler error, got unexpected exception: " << error.what()));
FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Format handler error, got unexpected exception: " << error.what()));
}

public:
Expand All @@ -345,7 +345,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Parser->ParseMessages(messages);
ScheduleRefresh();
} else if (!Clinets.empty()) {
FatalError(TStatus(TStatus::EId::INTERNAL_ERROR, "Failed to parse messages, expected empty clients set without parser"));
FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, "Failed to parse messages, expected empty clients set without parser"));
}
}

Expand All @@ -362,27 +362,27 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

auto clientHandler = MakeIntrusive<TClientHandler>(*this, client);
if (!Clinets.emplace(client->GetClientId(), clientHandler).second) {
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists");
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists");
}
Counters.ActiveClients->Inc();

if (auto status = clientHandler->SetupColumns()) {
if (auto status = clientHandler->SetupColumns(); status.IsFail()) {
RemoveClient(client->GetClientId());
return status.AddParentIssue("Failed to modify common parsing schema");
}

if (auto status = UpdateParser()) {
if (auto status = UpdateParser(); status.IsFail()) {
RemoveClient(client->GetClientId());
return status.AddParentIssue("Failed to update parser with new client");
}

CreateFilters();
if (auto status = Filters->AddFilter(clientHandler)) {
if (auto status = Filters->AddFilter(clientHandler); status.IsFail()) {
RemoveClient(client->GetClientId());
return status.AddParentIssue("Failed to create filter for new client");
}

return TStatus();
return TStatus::Success();
}

void RemoveClient(NActors::TActorId clientId) override {
Expand Down Expand Up @@ -414,7 +414,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}
}

if (auto status = UpdateParser()) {
if (auto status = UpdateParser(); status.IsFail()) {
FatalError(status.AddParentIssue("Failed to update parser after removing client"));
}
}
Expand Down Expand Up @@ -455,7 +455,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

if (ParserHandler && parerSchema == ParserHandler->GetColumns()) {
return TStatus();
return TStatus::Success();
}

if (Parser) {
Expand All @@ -468,13 +468,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

if (const ui64 schemaSize = ParserHandler->GetColumns().size()) {
auto newParser = CreateParserForFormat();
if (!newParser.IsSuccess()) {
if (newParser.IsFail()) {
return newParser;
}

LOG_ROW_DISPATCHER_DEBUG("Parser was updated on new schema with " << schemaSize << " columns");

Parser = newParser.GetValue();
Parser = newParser.DetachResult();
ParserSchemaIndex.resize(MaxColumnId, std::numeric_limits<ui64>::max());
for (ui64 i = 0; const auto& [_, columnDesc] : ColumnsDesc) {
ParserSchemaIndex[columnDesc.ColumnId] = i++;
Expand All @@ -483,7 +483,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
LOG_ROW_DISPATCHER_INFO("No columns to parse, reset parser");
}

return TStatus();
return TStatus::Success();
}

TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const {
Expand All @@ -493,7 +493,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
if (Settings.ParsingFormat == "json_each_row") {
return CreateJsonParser(ParserHandler, Config.JsonParserConfig);
}
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
}

void CreateFilters() {
Expand Down Expand Up @@ -523,7 +523,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void FatalError(TStatus status) const {
LOG_ROW_DISPATCHER_ERROR("Got fatal error: " << status.ToString());
LOG_ROW_DISPATCHER_ERROR("Got fatal error: " << status.GetErrorString());
for (const auto& [_, client] : Clinets) {
client->OnClientError(status);
}
Expand Down
Loading

0 comments on commit f5feee1

Please sign in to comment.