From f5feee1e4fd4b51272ecf24f8f7b294ab49a6fd0 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 9 Dec 2024 20:44:03 +0000 Subject: [PATCH] Moved status to shared library --- .../format_handler/common/common.cpp | 57 ------- .../format_handler/common/common.h | 67 +-------- .../format_handler/common/ya.make | 1 + .../format_handler/filters/filters_set.cpp | 12 +- .../filters/purecalc_filter.cpp | 2 +- .../format_handler/format_handler.cpp | 46 +++--- .../format_handler/parsers/json_parser.cpp | 70 ++++----- .../format_handler/parsers/parser_base.cpp | 8 +- .../format_handler/parsers/raw_parser.cpp | 23 +-- .../format_handler/ut/common/ut_common.cpp | 8 +- .../format_handler/ut/common/ut_common.h | 6 +- .../format_handler/ut/format_handler_ut.cpp | 20 +-- .../format_handler/ut/topic_filter_ut.cpp | 31 ++-- .../format_handler/ut/topic_parser_ut.cpp | 62 ++++---- .../purecalc_compilation/compile_service.cpp | 11 +- .../fq/libs/row_dispatcher/topic_session.cpp | 18 +-- .../row_dispatcher/ut/topic_session_ut.cpp | 16 +- ydb/library/conclusion/generic/result.h | 16 +- ydb/library/conclusion/generic/status.h | 142 +++++++++++++----- ydb/library/conclusion/generic/ya.make | 2 + 20 files changed, 290 insertions(+), 328 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp index bad8b3286fb2..8340bbf62f19 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp @@ -4,63 +4,6 @@ namespace NFq::NRowDispatcher { -//// TStatus - -TStatus::TStatus() - : Status(EId::SUCCESS) -{} - -TStatus::TStatus(TStatusCode status, NYql::TIssues issues) - : Status(status) - , Issues(std::move(issues)) -{} - -TStatus::TStatus(TStatusCode status, TString message) - : Status(status) - , Issues({NYql::TIssue(std::move(message))}) -{} - -bool TStatus::IsSuccess() const { - return Status == EId::SUCCESS; -} - -TStatus::operator bool() const { - return !IsSuccess(); -} - -TStatus::TStatusCode TStatus::GetStatus() const { - return Status; -} - -const NYql::TIssues& TStatus::GetIssues() const { - return Issues; -} - -TString TStatus::ToString() const { - return TStringBuilder() << "Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(Status) << ", Issues: " << Issues.ToOneLineString(); -} - -TStatus& TStatus::AddParentIssue(NYql::TIssue issue) { - for (const auto& childIssue : Issues) { - issue.AddSubIssue(MakeIntrusive(childIssue)); - } - Issues = {std::move(issue)}; - return *this; -} - -TStatus& TStatus::AddParentIssue(TString message) { - return AddParentIssue(NYql::TIssue(std::move(message))); -} - -TStatus& TStatus::AddIssue(NYql::TIssue issue) { - Issues.AddIssue(std::move(issue)); - return *this; -} - -TStatus& TStatus::AddIssue(TString message) { - return AddIssue(NYql::TIssue{std::move(message)}); -} - //// TSchemaColumn TString TSchemaColumn::ToString() const { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h index 83999cac5d64..eef669f5bfaa 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h @@ -1,72 +1,17 @@ #pragma once +#include +#include #include -#include - namespace NFq::NRowDispatcher { -struct TFormatHandlerException : public yexception { - using yexception::yexception; -}; - -class TStatus { -public: - using EId = NYql::NDqProto::StatusIds; - using TStatusCode = EId::StatusCode; - -public: - TStatus(); - explicit TStatus(TStatusCode status, NYql::TIssues issues = {}); - TStatus(TStatusCode status, TString message); - - virtual bool IsSuccess() const; - operator bool() const; // Equal to !IsSuccess - - TStatusCode GetStatus() const; - const NYql::TIssues& GetIssues() const; - - TString ToString() const; - - TStatus& AddParentIssue(NYql::TIssue issue); - TStatus& AddParentIssue(TString message); - - TStatus& AddIssue(NYql::TIssue issue); - TStatus& AddIssue(TString message); - -protected: - TStatusCode Status; - NYql::TIssues Issues; -}; +using EStatusId = NYql::NDqProto::StatusIds; +using TStatusCode = EStatusId::StatusCode; +using TStatus = NKikimr::TConclusionStatusIssueImpl; template -class TValueStatus : public TStatus { - using TBase = TStatus; - -public: - TValueStatus(TValue value) - : TBase(EId::SUCCESS) - , Value(std::move(value)) - {} - - TValueStatus(TStatus status) - : TBase(std::move(status)) - {} - - bool IsSuccess() const override { - return TBase::IsSuccess() && Value; - } - - TValue& GetValue() { - if (Y_UNLIKELY(!Value)) { - throw TFormatHandlerException() << "Internal error, failed to get value, " << ToString(); - } - return *Value; - } - -private: - std::optional Value; -}; +using TValueStatus = NKikimr::TConclusionImpl; struct TSchemaColumn { TString Name; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make index a8d9ca282389..e4d689c0a1aa 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + ydb/library/conclusion ydb/library/yql/dq/actors/protos yql/essentials/public/issue diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp index 3f705fee603b..e47ed20a0aa7 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp @@ -94,8 +94,8 @@ class TTopicFiltersSet : public ITopicFiltersSet { Self.Counters.InFlightCompileRequests->Dec(); if (!ev->Get()->ProgramHolder) { - auto status = TStatus(ev->Get()->Status, std::move(ev->Get()->Issues)); - LOG_ROW_DISPATCHER_ERROR("Filter compilation error: " << status.ToString()); + auto status = TStatus::Fail(ev->Get()->Status, std::move(ev->Get()->Issues)); + LOG_ROW_DISPATCHER_ERROR("Filter compilation error: " << status.GetErrorMessage()); Self.Counters.CompileErrors->Inc(); Consumer->OnFilteringError(status.AddParentIssue("Failed to compile client filter")); @@ -195,19 +195,19 @@ class TTopicFiltersSet : public ITopicFiltersSet { IPurecalcFilter::TPtr purecalcFilter; if (filter->GetWhereFilter()) { auto filterStatus = CreatePurecalcFilter(filter); - if (!filterStatus.IsSuccess()) { + if (filterStatus.IsFail()) { return filterStatus; } - purecalcFilter = std::move(filterStatus.GetValue()); + purecalcFilter = filterStatus.DetachResult(); } const auto [it, inserted] = Filters.insert({filter->GetFilterId(), TFilterHandler(*this, filter, std::move(purecalcFilter))}); if (!inserted) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new filter, filter with id " << filter->GetFilterId() << " already exists"); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new filter, filter with id " << filter->GetFilterId() << " already exists"); } it->second.CompileFilter(); - return TStatus(); + return TStatus::Success(); } void RemoveFilter(NActors::TActorId filterId) override { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp index 93ad5dbbf3a7..be4ce2a97d50 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp @@ -339,7 +339,7 @@ TValueStatus CreatePurecalcFilter(IPurecalcFilterConsumer try { return IPurecalcFilter::TPtr(MakeIntrusive(consumer)); } catch (...) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create purecalc filter with predicate '" << consumer->GetWhereFilter() << "', got unexpected exception: " << CurrentExceptionMessage()); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create purecalc filter with predicate '" << consumer->GetWhereFilter() << "', got unexpected exception: " << CurrentExceptionMessage()); } } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 547206667eb5..7f6a5e42ed68 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -62,7 +62,7 @@ class TTopicFormatHandler : public NActors::TActor, public } void OnParsingError(TStatus status) override { - LOG_ROW_DISPATCHER_ERROR("Got parsing error: " << status.ToString()); + LOG_ROW_DISPATCHER_ERROR("Got parsing error: " << status.GetErrorString()); Self.FatalError(status); } @@ -73,7 +73,7 @@ class TTopicFormatHandler : public NActors::TActor, public for (size_t i = 0; i < ParerSchema.size(); ++i) { auto columnStatus = Self.Parser->GetParsedColumn(i); if (Y_LIKELY(columnStatus.IsSuccess())) { - Self.ParsedData[i] = columnStatus.GetValue(); + Self.ParsedData[i] = columnStatus.DetachResult(); } else { OnColumnError(i, columnStatus); } @@ -86,7 +86,7 @@ class TTopicFormatHandler : public NActors::TActor, public private: void OnColumnError(ui64 columnIndex, TStatus status) { const auto& column = ParerSchema[columnIndex]; - LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.ToString()); + LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.GetErrorString()); const auto columnIt = Self.ColumnsDesc.find(column.Name); if (columnIt == Self.ColumnsDesc.end()) { @@ -132,14 +132,14 @@ class TTopicFormatHandler : public NActors::TActor, public TStatus SetupColumns() { if (Columns.empty()) { - return TStatus(TStatus::EId::INTERNAL_ERROR, "Client should have at least one column in schema"); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, "Client should have at least one column in schema"); } for (const auto& column : Columns) { const auto it = Self.ColumnsDesc.find(column.Name); if (it != Self.ColumnsDesc.end()) { if (it->second.TypeYson != column.TypeYson) { - return TStatus(TStatus::EId::SCHEME_ERROR, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << column.Name << "` is " << it->second.TypeYson << " (requested type is " << column.TypeYson <<")"); + return TStatus::Fail(EStatusId::SCHEME_ERROR, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << column.Name << "` is " << it->second.TypeYson << " (requested type is " << column.TypeYson <<")"); } it->second.Clients.emplace(Client->GetClientId()); @@ -169,7 +169,7 @@ class TTopicFormatHandler : public NActors::TActor, public } void OnClientError(TStatus status) { - LOG_ROW_DISPATCHER_WARN("OnClientError, " << status.ToString()); + LOG_ROW_DISPATCHER_WARN("OnClientError, " << status.GetErrorString()); Client->OnClientError(std::move(status)); } @@ -242,17 +242,17 @@ class TTopicFormatHandler : public NActors::TActor, public columnTypes.reserve(Columns.size()); for (const auto& column : Columns) { auto status = Self.ParseTypeYson(column.TypeYson); - if (!status.IsSuccess()) { + if (status.IsFail()) { return status; } - columnTypes.emplace_back(status.GetValue()); + columnTypes.emplace_back(status.DetachResult()); } with_lock(Self.Alloc) { const auto rowType = Self.ProgramBuilder->NewMultiType(columnTypes); DataPacker = std::make_unique>(rowType); } - return TStatus(); + return TStatus::Success(); } void FinishPacking() { @@ -334,7 +334,7 @@ class TTopicFormatHandler : public NActors::TActor, public void HandleException(const std::exception& error) { LOG_ROW_DISPATCHER_ERROR("Got unexpected exception: " << error.what()); - FatalError(TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Format handler error, got unexpected exception: " << error.what())); + FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Format handler error, got unexpected exception: " << error.what())); } public: @@ -345,7 +345,7 @@ class TTopicFormatHandler : public NActors::TActor, public Parser->ParseMessages(messages); ScheduleRefresh(); } else if (!Clinets.empty()) { - FatalError(TStatus(TStatus::EId::INTERNAL_ERROR, "Failed to parse messages, expected empty clients set without parser")); + FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, "Failed to parse messages, expected empty clients set without parser")); } } @@ -362,27 +362,27 @@ class TTopicFormatHandler : public NActors::TActor, public auto clientHandler = MakeIntrusive(*this, client); if (!Clinets.emplace(client->GetClientId(), clientHandler).second) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists"); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists"); } Counters.ActiveClients->Inc(); - if (auto status = clientHandler->SetupColumns()) { + if (auto status = clientHandler->SetupColumns(); status.IsFail()) { RemoveClient(client->GetClientId()); return status.AddParentIssue("Failed to modify common parsing schema"); } - if (auto status = UpdateParser()) { + if (auto status = UpdateParser(); status.IsFail()) { RemoveClient(client->GetClientId()); return status.AddParentIssue("Failed to update parser with new client"); } CreateFilters(); - if (auto status = Filters->AddFilter(clientHandler)) { + if (auto status = Filters->AddFilter(clientHandler); status.IsFail()) { RemoveClient(client->GetClientId()); return status.AddParentIssue("Failed to create filter for new client"); } - return TStatus(); + return TStatus::Success(); } void RemoveClient(NActors::TActorId clientId) override { @@ -414,7 +414,7 @@ class TTopicFormatHandler : public NActors::TActor, public } } - if (auto status = UpdateParser()) { + if (auto status = UpdateParser(); status.IsFail()) { FatalError(status.AddParentIssue("Failed to update parser after removing client")); } } @@ -455,7 +455,7 @@ class TTopicFormatHandler : public NActors::TActor, public } if (ParserHandler && parerSchema == ParserHandler->GetColumns()) { - return TStatus(); + return TStatus::Success(); } if (Parser) { @@ -468,13 +468,13 @@ class TTopicFormatHandler : public NActors::TActor, public if (const ui64 schemaSize = ParserHandler->GetColumns().size()) { auto newParser = CreateParserForFormat(); - if (!newParser.IsSuccess()) { + if (newParser.IsFail()) { return newParser; } LOG_ROW_DISPATCHER_DEBUG("Parser was updated on new schema with " << schemaSize << " columns"); - Parser = newParser.GetValue(); + Parser = newParser.DetachResult(); ParserSchemaIndex.resize(MaxColumnId, std::numeric_limits::max()); for (ui64 i = 0; const auto& [_, columnDesc] : ColumnsDesc) { ParserSchemaIndex[columnDesc.ColumnId] = i++; @@ -483,7 +483,7 @@ class TTopicFormatHandler : public NActors::TActor, public LOG_ROW_DISPATCHER_INFO("No columns to parse, reset parser"); } - return TStatus(); + return TStatus::Success(); } TValueStatus CreateParserForFormat() const { @@ -493,7 +493,7 @@ class TTopicFormatHandler : public NActors::TActor, public if (Settings.ParsingFormat == "json_each_row") { return CreateJsonParser(ParserHandler, Config.JsonParserConfig); } - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat); } void CreateFilters() { @@ -523,7 +523,7 @@ class TTopicFormatHandler : public NActors::TActor, public } void FatalError(TStatus status) const { - LOG_ROW_DISPATCHER_ERROR("Got fatal error: " << status.ToString()); + LOG_ROW_DISPATCHER_ERROR("Got fatal error: " << status.GetErrorString()); for (const auto& [_, client] : Clinets) { client->OnClientError(status); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp index 6a280727ecf8..9b241c9635d9 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp @@ -76,6 +76,7 @@ class TColumnParser { TColumnParser(const TString& name, const TString& typeYson, ui64 maxNumberRows) : Name(name) , TypeYson(typeYson) + , Status(TStatus::Success()) { ParsedRows.reserve(maxNumberRows); } @@ -93,7 +94,7 @@ class TColumnParser { } void ParseJsonValue(ui64 offset, ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { - if (Y_UNLIKELY(!Status.IsSuccess())) { + if (Y_UNLIKELY(Status.IsFail())) { return; } ParsedRows.emplace_back(rowId); @@ -108,23 +109,23 @@ class TColumnParser { resultValue = resultValue.MakeOptional(); } - if (Y_UNLIKELY(!Status.IsSuccess())) { + if (Y_UNLIKELY(Status.IsFail())) { Status.AddParentIssue(TStringBuilder() << "Failed to parse json string at offset " << offset << ", got parsing error for column '" << Name << "' with type " << TypeYson); } } void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) { - if (!Status.IsSuccess()) { + if (Status.IsFail()) { return; } if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { - Status = TStatus(TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson); + Status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson); } } void ClearParsedRows() { ParsedRows.clear(); - Status = TStatus(); + Status = TStatus::Success(); } private: @@ -132,23 +133,24 @@ class TColumnParser { switch (type->GetKind()) { case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { auto slotStatus = GetDataSlot(type); - if (slotStatus.IsSuccess()) { - DataSlot = slotStatus.GetValue(); - DataTypeName = NYql::NUdf::GetDataTypeInfo(DataSlot).Name; + if (slotStatus.IsFail()) { + return slotStatus; } - return slotStatus; + DataSlot = slotStatus.DetachResult(); + DataTypeName = NYql::NUdf::GetDataTypeInfo(DataSlot).Name; + return TStatus::Success(); } case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { if (IsOptional) { - return TStatus(TStatus::EId::UNSUPPORTED, TStringBuilder() << "Nested optionals is not supported as input type"); + return TStatus::Fail(EStatusId::UNSUPPORTED, TStringBuilder() << "Nested optionals is not supported as input type"); } IsOptional = true; return ExtractDataSlot(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType()); } default: { - return TStatus(TStatus::EId::UNSUPPORTED, TStringBuilder() << "Unsupported type kind: " << type->GetKindAsStr()); + return TStatus::Fail(EStatusId::UNSUPPORTED, TStringBuilder() << "Unsupported type kind: " << type->GetKindAsStr()); } } } @@ -196,10 +198,10 @@ class TColumnParser { break; default: - status = TStatus(TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Number value is not expected for data type " << DataTypeName); + status = TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Number value is not expected for data type " << DataTypeName); break; } - if (Y_UNLIKELY(!status.IsSuccess())) { + if (Y_UNLIKELY(status.IsFail())) { status.AddParentIssue(TStringBuilder() << "Failed to parse data type " << DataTypeName << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "')"); } return; @@ -213,7 +215,7 @@ class TColumnParser { resultValue = LockObject(NKikimr::NMiniKQL::ValueFromString(DataSlot, rawString)); if (Y_UNLIKELY(!resultValue)) { - status = TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse data type " << DataTypeName << " from json string: '" << TruncateString(rawString) << "'"); + status = TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse data type " << DataTypeName << " from json string: '" << TruncateString(rawString) << "'"); } return; } @@ -224,13 +226,13 @@ class TColumnParser { CHECK_JSON_ERROR(jsonValue.raw_json().get(rawJson)) { return GetParsingError(error, jsonValue, "extract json value", status); } - status = TStatus(TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " < Y_FORCE_INLINE static void ParseJsonNumber(simdjson::simdjson_result jsonNumber, NYql::NUdf::TUnboxedValue& resultValue, TStatus& status) { CHECK_JSON_ERROR(jsonNumber.error()) { - status = TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to extract json integer number, error: " << simdjson::error_message(error)); + status = TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to extract json integer number, error: " << simdjson::error_message(error)); return; } TJsonNumber number = jsonNumber.value(); if (Y_UNLIKELY(number < std::numeric_limits::min() || std::numeric_limits::max() < number)) { - status = TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Number is out of range [" << ToString(std::numeric_limits::min()) << ", " << ToString(std::numeric_limits::max()) << "]"); + status = TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Number is out of range [" << ToString(std::numeric_limits::min()) << ", " << ToString(std::numeric_limits::max()) << "]"); return; } @@ -288,7 +290,7 @@ class TColumnParser { template Y_FORCE_INLINE static void ParseJsonDouble(simdjson::simdjson_result jsonNumber, NYql::NUdf::TUnboxedValue& resultValue, TStatus& status) { CHECK_JSON_ERROR(jsonNumber.error()) { - status = TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to extract json float number, error: " << simdjson::error_message(error)); + status = TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to extract json float number, error: " << simdjson::error_message(error)); return; } @@ -296,7 +298,7 @@ class TColumnParser { } static void GetParsingError(simdjson::error_code error, simdjson::builtin::ondemand::value jsonValue, const TString& description, TStatus& status) { - status = TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to " << description << ", current token: '" << TruncateString(jsonValue.raw_json_token()) << "', error: " << simdjson::error_message(error)); + status = TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to " << description << ", current token: '" << TruncateString(jsonValue.raw_json_token()) << "', error: " << simdjson::error_message(error)); } private: @@ -345,14 +347,14 @@ class TJsonParser : public TTopicParserBase { TStatus InitColumnsParsers() { for (auto& column : Columns) { auto typeStatus = ParseTypeYson(column.TypeYson); - if (!typeStatus.IsSuccess()) { - return typeStatus.AddParentIssue(TStringBuilder() << "Failed to parse column '" << column.Name << "' type " << column.TypeYson); + if (typeStatus.IsFail()) { + return TStatus(typeStatus).AddParentIssue(TStringBuilder() << "Failed to parse column '" << column.Name << "' type " << column.TypeYson); } - if (auto status = column.InitParser(typeStatus.GetValue())) { + if (auto status = column.InitParser(typeStatus.DetachResult()); status.IsFail()) { return status.AddParentIssue(TStringBuilder() << "Failed to create parser for column '" << column.Name << "' with type " << column.TypeYson); } } - return TStatus(); + return TStatus::Success(); } public: @@ -396,7 +398,7 @@ class TJsonParser : public TTopicParserBase { } TValueStatus*> GetParsedColumn(ui64 columnId) const override { - if (auto status = Columns[columnId].GetStatus()) { + if (auto status = Columns[columnId].GetStatus(); status.IsFail()) { return status; } return &ParsedValues[columnId]; @@ -412,23 +414,23 @@ class TJsonParser : public TTopicParserBase { simdjson::ondemand::document_stream documents; CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) { - return TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error)); } size_t rowId = 0; for (auto document : documents) { if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1); } const ui64 offset = Buffer.Offsets[rowId]; CHECK_JSON_ERROR(document.error()) { - return TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error)); } for (auto item : document.get_object()) { CHECK_JSON_ERROR(item.error()) { - return TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error)); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error)); } const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -443,7 +445,7 @@ class TJsonParser : public TTopicParserBase { } if (Y_UNLIKELY(rowId != Buffer.NumberValues)) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId); } const ui64 firstOffset = Buffer.Offsets.front(); @@ -451,7 +453,7 @@ class TJsonParser : public TTopicParserBase { column.ValidateNumberValues(rowId, firstOffset); } - return TStatus(); + return TStatus::Success(); } void ClearBuffer() override { @@ -483,7 +485,7 @@ class TJsonParser : public TTopicParserBase { TValueStatus CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) { TJsonParser::TPtr parser = MakeIntrusive(consumer, config); - if (auto status = parser->InitColumnsParsers()) { + if (auto status = parser->InitColumnsParsers(); status.IsFail()) { return status; } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp index b876894b9f37..c7ee611a0d64 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.cpp @@ -35,7 +35,7 @@ TValueStatus TTypeParser::ParseTypeYson(const TString } if (!typeMkql) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse type from yson: " << parseTypeError); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse type from yson: " << parseTypeError); } return typeMkql; } @@ -84,7 +84,7 @@ void TTopicParserBase::ParseBuffer() { } }; - TStatus status; + TStatus status = TStatus::Success(); const TInstant startParse = TInstant::Now(); with_lock(Alloc) { status = DoParsing(); @@ -97,7 +97,7 @@ void TTopicParserBase::ParseBuffer() { Consumer->OnParsingError(status); } } catch (...) { - Consumer->OnParsingError(TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse messages from offset " << GetOffsets().front() << ", got unexpected exception: " << CurrentExceptionMessage())); + Consumer->OnParsingError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse messages from offset " << GetOffsets().front() << ", got unexpected exception: " << CurrentExceptionMessage())); } Stats.AddParseAndFilterLatency(TInstant::Now() - startParseAndFilter); } @@ -129,7 +129,7 @@ TValueStatus GetDataSlot(const NKikimr::NMiniKQL::TType* if (const auto dataSlot = dataType->GetDataSlot()) { return *dataSlot; } - return TStatus(TStatus::EId::UNSUPPORTED, TStringBuilder() << "Unsupported data type with id: " << dataType->GetSchemeType()); + return TStatus::Fail(EStatusId::UNSUPPORTED, TStringBuilder() << "Unsupported data type with id: " << dataType->GetSchemeType()); } TString TruncateString(std::string_view rawString, size_t maxSize) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp index 242bbec314ca..b230b8dd0a50 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.cpp @@ -25,27 +25,28 @@ class TRawParser : public TTopicParserBase { TStatus InitColumnParser() { auto typeStatus = ParseTypeYson(Schema.TypeYson); - if (!typeStatus.IsSuccess()) { + if (typeStatus.IsFail()) { return typeStatus; } - for (NKikimr::NMiniKQL::TType* type = typeStatus.GetValue(); true; type = AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType()) { + for (NKikimr::NMiniKQL::TType* type = typeStatus.DetachResult(); true; type = AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType()) { if (type->GetKind() == NKikimr::NMiniKQL::TTypeBase::EKind::Data) { auto slotStatus = GetDataSlot(type); - if (slotStatus.IsSuccess()) { - DataSlot = slotStatus.GetValue(); + if (slotStatus.IsFail()) { + return slotStatus; } - return slotStatus; + DataSlot = slotStatus.DetachResult(); + return TStatus::Success(); } if (type->GetKind() != NKikimr::NMiniKQL::TTypeBase::EKind::Optional) { - return TStatus(TStatus::EId::UNSUPPORTED, TStringBuilder() << "Unsupported type kind for raw format: " << type->GetKindAsStr()); + return TStatus::Fail(EStatusId::UNSUPPORTED, TStringBuilder() << "Unsupported type kind for raw format: " << type->GetKindAsStr()); } NumberOptionals++; } - return TStatus(); + return TStatus::Success(); } public: @@ -78,11 +79,11 @@ class TRawParser : public TTopicParserBase { value = value.MakeOptional(); } } else if (!NumberOptionals) { - return TStatus(TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse massege at offset " << Offsets.back() << ", can't parse data type " << NYql::NUdf::GetDataTypeInfo(DataSlot).Name << " from string: '" << TruncateString(CurrentMessage) << "'"); + return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse massege at offset " << Offsets.back() << ", can't parse data type " << NYql::NUdf::GetDataTypeInfo(DataSlot).Name << " from string: '" << TruncateString(CurrentMessage) << "'"); } ParsedColumn.emplace_back(std::move(value)); - return TStatus(); + return TStatus::Success(); } void ClearBuffer() override { @@ -110,11 +111,11 @@ class TRawParser : public TTopicParserBase { TValueStatus CreateRawParser(IParsedDataConsumer::TPtr consumer) { const auto& columns = consumer->GetColumns(); if (columns.size() != 1) { - return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Expected only one column for raw format, but got " << columns.size()); + return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Expected only one column for raw format, but got " << columns.size()); } TRawParser::TPtr parser = MakeIntrusive(consumer, columns[0]); - if (auto status = parser->InitColumnParser()) { + if (auto status = parser->InitColumnParser(); status.IsFail()) { return status.AddParentIssue(TStringBuilder() << "Failed to create raw parser for column " << columns[0].ToString()); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp index 23cfad654d0b..5614637c9598 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp @@ -240,12 +240,12 @@ NActors::IActor* CreatePurecalcCompileServiceMock(NActors::TActorId owner) { } void CheckSuccess(const TStatus& status) { - UNIT_ASSERT_C(status.IsSuccess(), TStringBuilder() << "Status is not success, " << status.ToString()); + UNIT_ASSERT_C(status.IsSuccess(), "Status is not success, " << status.GetErrorString()); } -void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage) { - UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, TStringBuilder() << "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.ToString()); - UNIT_ASSERT_STRING_CONTAINS_C(status.GetIssues().ToOneLineString(), expectedMessage, TStringBuilder() << "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus())); +void CheckError(const TStatus& status, TStatusCode expectedStatusCode, const TString& expectedMessage) { + UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.GetErrorString()); + UNIT_ASSERT_STRING_CONTAINS_C(status.GetErrorMessage().ToOneLineString(), expectedMessage, "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus())); } } // namespace NFq::NRowDispatcher::NTests diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h index df5f5049192f..276c3ed7f094 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h @@ -76,12 +76,12 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser { NActors::IActor* CreatePurecalcCompileServiceMock(NActors::TActorId owner); void CheckSuccess(const TStatus& status); -void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage); +void CheckError(const TStatus& status, TStatusCode expectedStatusCode, const TString& expectedMessage); template TValue CheckSuccess(TValueStatus valueStatus) { - CheckSuccess(TStatus(valueStatus)); - return std::move(valueStatus.GetValue()); + UNIT_ASSERT_C(valueStatus.IsSuccess(), "Value status is not success, " << valueStatus.GetErrorString()); + return valueStatus.DetachResult(); } } // namespace NFq::NRowDispatcher::NTests diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index 4ec8b1b5f086..17ec09099769 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -37,7 +37,7 @@ class TFormatHadlerFixture : public TBaseFixture { HasData = false; } - void ExpectError(TStatus::TStatusCode statusCode, const TString& message) { + void ExpectError(TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(!ExpectedError, "Can not add existing error, client id: " << ClientId); ExpectedError = {statusCode, message}; } @@ -78,7 +78,7 @@ class TFormatHadlerFixture : public TBaseFixture { } void OnClientError(TStatus status) override { - UNIT_ASSERT_C(!Offsets.empty(), "Unexpected message batch, status: " << status.ToString() << ", client id: " << ClientId); + UNIT_ASSERT_C(!Offsets.empty(), "Unexpected message batch, status: " << status.GetErrorMessage() << ", client id: " << ClientId); if (ExpectedError) { CheckError(status, ExpectedError->first, ExpectedError->second); @@ -123,7 +123,7 @@ class TFormatHadlerFixture : public TBaseFixture { bool Frozen = false; ui64 ExpectedFilteredRows = 0; std::queue Offsets; - std::optional> ExpectedError; + std::optional> ExpectedError; }; public: @@ -160,7 +160,7 @@ class TFormatHadlerFixture : public TBaseFixture { auto client = MakeIntrusive(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows); auto status = FormatHandler->AddClient(client); - if (!status.IsSuccess()) { + if (status.IsFail()) { return status; } @@ -171,7 +171,7 @@ class TFormatHadlerFixture : public TBaseFixture { UNIT_ASSERT_C(response, "Compilation is not performed for filter: " << whereFilter); } - return TStatus(); + return TStatus::Success(); } void ParseMessages(const TVector& messages, TVector expectedOffsets = {}) { @@ -182,7 +182,7 @@ class TFormatHadlerFixture : public TBaseFixture { ExtractClientsData(); } - void CheckClientError(const TVector& messages, NActors::TActorId clientId, TStatus::TStatusCode statusCode, const TString& message) { + void CheckClientError(const TVector& messages, NActors::TActorId clientId, TStatusCode statusCode, const TString& message) { for (auto& client : Clients) { client->ExpectOffsets({messages.back().GetOffset()}); if (client->GetClientId() == clientId) { @@ -328,19 +328,19 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { CheckError( FormatHandler->AddClient(MakeIntrusive(ClientIds.back(), schema, filter, callback)), - TStatus::EId::INTERNAL_ERROR, + EStatusId::INTERNAL_ERROR, "Failed to create new client, client with id [0:0:0] already exists" ); CheckError( MakeClient({{"data", "[OptionalType; [DataType; String]]"}}, filter, callback, 0), - TStatus::EId::SCHEME_ERROR, + EStatusId::SCHEME_ERROR, "Failed to modify common parsing schema" ); CheckError( MakeClient({{"data_2", "[ListType; [DataType; String]]"}}, filter, callback, 0), - TStatus::EId::UNSUPPORTED, + EStatusId::UNSUPPORTED, "Failed to update parser with new client" ); } @@ -375,7 +375,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) { CheckClientError( {GetMessage(firstOffset, R"({"com_col": "event1", "col_first": "some_str", "col_second": "str_second"})")}, ClientIds[0], - TStatus::EId::BAD_REQUEST, + EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]" ); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 8e2e7bce69eb..79fe087cc936 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -18,7 +18,7 @@ class TFiterFixture : public TBaseFixture { using TPtr = TIntrusivePtr; public: - TFilterConsumer(const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) + TFilterConsumer(const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) : Columns(columns) , WhereFilter(whereFilter) , Callback(callback) @@ -60,7 +60,7 @@ class TFiterFixture : public TBaseFixture { Started = true; CheckError(status, CompileError->first, CompileError->second); } else { - UNIT_FAIL("Filtering failed: " << status.ToString()); + UNIT_FAIL("Filtering failed: " << status.GetErrorString()); } } @@ -78,7 +78,7 @@ class TFiterFixture : public TBaseFixture { const TVector Columns; const TString WhereFilter; const TCallback Callback; - const std::optional> CompileError; + const std::optional> CompileError; }; public: @@ -108,12 +108,13 @@ class TFiterFixture : public TBaseFixture { FilterHandler = MakeIntrusive(columns, whereFilter, callback, CompileError); auto filterStatus = CreatePurecalcFilter(FilterHandler); - if (filterStatus.IsSuccess()) { - Filter = std::move(filterStatus.GetValue()); - CompileFilter(); + if (filterStatus.IsFail()) { + return filterStatus; } - return filterStatus; + Filter = filterStatus.DetachResult(); + CompileFilter(); + return TStatus::Success(); } void Push(const TVector*>& values, ui64 numberRows = 0) { @@ -163,7 +164,7 @@ class TFiterFixture : public TBaseFixture { Filter->OnCompileResponse(std::move(response)); FilterHandler->OnFilterStarted(); } else { - CheckError(TStatus(response->Get()->Status, response->Get()->Issues), CompileError->first, CompileError->second); + CheckError(TStatus::Fail(response->Get()->Status, response->Get()->Issues), CompileError->first, CompileError->second); } } @@ -173,7 +174,7 @@ class TFiterFixture : public TBaseFixture { IPurecalcFilter::TPtr Filter; TList> Holders; - std::optional> CompileError; + std::optional> CompileError; }; class TFilterSetFixture : public TFiterFixture { @@ -186,7 +187,7 @@ class TFilterSetFixture : public TFiterFixture { using TPtr = TIntrusivePtr; public: - TFilterSetConsumer(NActors::TActorId filterId, const TVector& columnIds, const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) + TFilterSetConsumer(NActors::TActorId filterId, const TVector& columnIds, const TVector& columns, const TString& whereFilter, TCallback callback, std::optional> compileError) : TBase(columns, whereFilter, callback, compileError) { FilterId = filterId; @@ -228,7 +229,7 @@ class TFilterSetFixture : public TFiterFixture { FilterIds.emplace_back(FilterIds.size(), 0, 0, 0); auto filterSetHandler = MakeIntrusive(FilterIds.back(), columnIds, columns, whereFilter, callback, CompileError); - if (auto status = FiltersSet->AddFilter(filterSetHandler)) { + if (auto status = FiltersSet->AddFilter(filterSetHandler); status.IsFail()) { return status; } @@ -239,7 +240,7 @@ class TFilterSetFixture : public TFiterFixture { FiltersSet->OnCompileResponse(std::move(response)); } - return TStatus(); + return TStatus::Success(); } void FilterData(const TVector& columnIndex, const TVector*>& values, ui64 numberRows = 0) { @@ -344,7 +345,7 @@ Y_UNIT_TEST_SUITE(TestPurecalcFilter) { } Y_UNIT_TEST_F(CompilationValidation, TFiterFixture) { - CompileError = {TStatus::EId::INTERNAL_ERROR, "Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; + CompileError = {EStatusId::INTERNAL_ERROR, "Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; MakeFilter( {{"a1", "[DataType; String]"}}, "where a2 ... 50", @@ -408,13 +409,13 @@ Y_UNIT_TEST_SUITE(TestFilterSet) { CheckError( FiltersSet->AddFilter(MakeIntrusive(FilterIds.back(), TVector(), TVector(), TString(), [&](ui64 offset) {}, CompileError)), - TStatus::EId::INTERNAL_ERROR, + EStatusId::INTERNAL_ERROR, "Failed to create new filter, filter with id [0:0:0] already exists" ); } Y_UNIT_TEST_F(CompilationValidation, TFilterSetFixture) { - CompileError = {TStatus::EId::INTERNAL_ERROR, "Failed to compile client filter subissue: {
: Error: Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; + CompileError = {EStatusId::INTERNAL_ERROR, "Failed to compile client filter subissue: {
: Error: Failed to compile purecalc program subissue: {
: Error: Compile issues: generated.sql:2:36: Error: Unexpected token '.' : cannot match to any predicted input... } subissue: {
: Error: Final yql:"}; MakeFilter( {{"a1", "[DataType; String]"}}, "where a2 ... 50", diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index d0091d43d0f1..e2d46d53679d 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -24,11 +24,11 @@ class TBaseParserFixture : public TBaseFixture { , Callback(callback) {} - void ExpectColumnError(ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) { + void ExpectColumnError(ui64 columnId, TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(ExpectedErrors.insert({columnId, {statusCode, message}}).second, "Can not add existing column error"); } - void ExpectCommonError(TStatus::TStatusCode statusCode, const TString& message) { + void ExpectCommonError(TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(!ExpectedCommonError, "Can not add existing common error"); ExpectedCommonError = {statusCode, message}; } @@ -81,8 +81,8 @@ class TBaseParserFixture : public TBaseFixture { const TVector Columns; const TCallback Callback; - std::optional> ExpectedCommonError; - std::unordered_map> ExpectedErrors; + std::optional> ExpectedCommonError; + std::unordered_map> ExpectedErrors; }; public: @@ -101,10 +101,12 @@ class TBaseParserFixture : public TBaseFixture { ParserHandler = MakeIntrusive(*this, columns, callback); auto parserStatus = CreateParser(); - if (parserStatus.IsSuccess()) { - Parser = std::move(parserStatus.GetValue()); + if (parserStatus.IsFail()) { + return parserStatus; } - return parserStatus; + + Parser = parserStatus.DetachResult(); + return TStatus::Success(); } TStatus MakeParser(TVector columnNames, TString columnType, TCallback callback) { @@ -128,13 +130,13 @@ class TBaseParserFixture : public TBaseFixture { Parser->ParseMessages({GetMessage(offset, data)}); } - void CheckColumnError(const TString& data, ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) { + void CheckColumnError(const TString& data, ui64 columnId, TStatusCode statusCode, const TString& message) { ExpectedBatches++; ParserHandler->ExpectColumnError(columnId, statusCode, message); Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)}); } - void CheckBatchError(const TString& data, TStatus::TStatusCode statusCode, const TString& message) { + void CheckBatchError(const TString& data, TStatusCode statusCode, const TString& message) { ExpectedBatches++; ParserHandler->ExpectCommonError(statusCode, message); Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)}); @@ -366,55 +368,55 @@ Y_UNIT_TEST_SUITE(TestJsonParser) { Y_UNIT_TEST_F(MissingFieldsValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[DataType; String]"}, {"a2", "[DataType; Uint64]"}})); - CheckColumnError(R"({"a2": 105, "event": "event1"})", 0, TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << FIRST_OFFSET << " in non optional column 'a1' with type [DataType; String]"); - CheckColumnError(R"({"a1": "hello1", "a2": null, "event": "event1"})", 1, TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Uint64] subissue: {
: Error: Found unexpected null value, expected non optional data type Uint64 }"); + CheckColumnError(R"({"a2": 105, "event": "event1"})", 0, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << FIRST_OFFSET << " in non optional column 'a1' with type [DataType; String]"); + CheckColumnError(R"({"a1": "hello1", "a2": null, "event": "event1"})", 1, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Uint64] subissue: {
: Error: Found unexpected null value, expected non optional data type Uint64 }"); } Y_UNIT_TEST_F(TypeKindsValidation, TJsonParserFixture) { CheckError( MakeParser({{"a1", "[[BAD TYPE]]"}}), - TStatus::EId::INTERNAL_ERROR, + EStatusId::INTERNAL_ERROR, "Failed to parse column 'a1' type [[BAD TYPE]] subissue: {
: Error: Failed to parse type from yson: Failed to parse scheme from YSON:" ); CheckError( MakeParser({{"a2", "[OptionalType; [DataType; String]]"}, {"a1", "[ListType; [DataType; String]]"}}), - TStatus::EId::UNSUPPORTED, + EStatusId::UNSUPPORTED, "Failed to create parser for column 'a1' with type [ListType; [DataType; String]] subissue: {
: Error: Unsupported type kind: List }" ); } Y_UNIT_TEST_F(NumbersValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[DataType; Uint8]"}, {"a3", "[OptionalType; [DataType; Float]]"}})); - CheckColumnError(R"({"a1": 456, "a2": 42})", 0, TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Failed to parse data type String from json number (raw: '456') subissue: {
: Error: Number value is not expected for data type String } }"); - CheckColumnError(R"({"a1": "456", "a2": -42})", 1, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Uint8] subissue: {
: Error: Failed to parse data type Uint8 from json number (raw: '-42') subissue: {
: Error: Failed to extract json integer number, error: INCORRECT_TYPE: The JSON element does not have the requested type. } }"); - CheckColumnError(R"({"a1": "str", "a2": 99999})", 1, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 2 << ", got parsing error for column 'a2' with type [DataType; Uint8] subissue: {
: Error: Failed to parse data type Uint8 from json number (raw: '99999') subissue: {
: Error: Number is out of range [0, 255] } }"); - CheckColumnError(R"({"a1": "456", "a2": 42, "a3": 1.11.1})", 2, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 3 << ", got parsing error for column 'a3' with type [OptionalType; [DataType; Float]] subissue: {
: Error: Failed to parse data type Float from json number (raw: '1.11.1') subissue: {
: Error: Failed to extract json float number, error: NUMBER_ERROR: Problem while parsing a number } }"); + CheckColumnError(R"({"a1": 456, "a2": 42})", 0, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Failed to parse data type String from json number (raw: '456') subissue: {
: Error: Number value is not expected for data type String } }"); + CheckColumnError(R"({"a1": "456", "a2": -42})", 1, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Uint8] subissue: {
: Error: Failed to parse data type Uint8 from json number (raw: '-42') subissue: {
: Error: Failed to extract json integer number, error: INCORRECT_TYPE: The JSON element does not have the requested type. } }"); + CheckColumnError(R"({"a1": "str", "a2": 99999})", 1, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 2 << ", got parsing error for column 'a2' with type [DataType; Uint8] subissue: {
: Error: Failed to parse data type Uint8 from json number (raw: '99999') subissue: {
: Error: Number is out of range [0, 255] } }"); + CheckColumnError(R"({"a1": "456", "a2": 42, "a3": 1.11.1})", 2, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 3 << ", got parsing error for column 'a3' with type [OptionalType; [DataType; Float]] subissue: {
: Error: Failed to parse data type Float from json number (raw: '1.11.1') subissue: {
: Error: Failed to extract json float number, error: NUMBER_ERROR: Problem while parsing a number } }"); } Y_UNIT_TEST_F(StringsValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; Uint8]]"}})); - CheckColumnError(R"({"a1": "-456"})", 0, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Uint8]] subissue: {
: Error: Failed to parse data type Uint8 from json string: '-456' }"); + CheckColumnError(R"({"a1": "-456"})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Uint8]] subissue: {
: Error: Failed to parse data type Uint8 from json string: '-456' }"); } Y_UNIT_TEST_F(NestedJsonValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; Json]]"}, {"a2", "[OptionalType; [DataType; String]]"}})); - CheckColumnError(R"({"a1": {"key": "value"}, "a2": {"key2": "value2"}})", 1, TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a2' with type [OptionalType; [DataType; String]] subissue: {
: Error: Found unexpected nested value (raw: '{\"key2\": \"value2\"}'), expected data type String, please use Json type for nested values }"); - CheckColumnError(R"({"a1": {"key": "value", "nested": {"a": "b", "c":}}, "a2": "str"})", 0, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Json]] subissue: {
: Error: Found bad json value: '{\"key\": \"value\", \"nested\": {\"a\": \"b\", \"c\":}}' }"); - CheckColumnError(R"({"a1": {"key" "value"}, "a2": "str"})", 0, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 2 << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Json]] subissue: {
: Error: Failed to extract json value, current token: '{', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }"); + CheckColumnError(R"({"a1": {"key": "value"}, "a2": {"key2": "value2"}})", 1, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a2' with type [OptionalType; [DataType; String]] subissue: {
: Error: Found unexpected nested value (raw: '{\"key2\": \"value2\"}'), expected data type String, please use Json type for nested values }"); + CheckColumnError(R"({"a1": {"key": "value", "nested": {"a": "b", "c":}}, "a2": "str"})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Json]] subissue: {
: Error: Found bad json value: '{\"key\": \"value\", \"nested\": {\"a\": \"b\", \"c\":}}' }"); + CheckColumnError(R"({"a1": {"key" "value"}, "a2": "str"})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 2 << ", got parsing error for column 'a1' with type [OptionalType; [DataType; Json]] subissue: {
: Error: Failed to extract json value, current token: '{', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }"); } Y_UNIT_TEST_F(BoolsValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}, {"a2", "[DataType; Bool]"}})); - CheckColumnError(R"({"a1": true, "a2": false})", 0, TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Found unexpected bool value, expected data type String }"); - CheckColumnError(R"({"a1": "true", "a2": falce})", 1, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Bool] subissue: {
: Error: Failed to extract json bool, current token: 'falce', error: INCORRECT_TYPE: The JSON element does not have the requested type. }"); + CheckColumnError(R"({"a1": true, "a2": false})", 0, EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Found unexpected bool value, expected data type String }"); + CheckColumnError(R"({"a1": "true", "a2": falce})", 1, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET + 1 << ", got parsing error for column 'a2' with type [DataType; Bool] subissue: {
: Error: Failed to extract json bool, current token: 'falce', error: INCORRECT_TYPE: The JSON element does not have the requested type. }"); } Y_UNIT_TEST_F(JsonStructureValidation, TJsonParserFixture) { CheckSuccess(MakeParser({{"a1", "[OptionalType; [DataType; String]]"}})); - CheckColumnError(R"({"a1": Yelse})", 0, TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }"); - CheckBatchError(R"({"a1": "st""r"})", TStatus::EId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc."); - CheckBatchError(R"({"a1": "x"} {"a1": "y"})", TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2"); - CheckBatchError(R"({)", TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0"); + CheckColumnError(R"({"a1": Yelse})", 0, EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json string at offset " << FIRST_OFFSET << ", got parsing error for column 'a1' with type [OptionalType; [DataType; String]] subissue: {
: Error: Failed to determine json value type, current token: 'Yelse', error: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc. }"); + CheckBatchError(R"({"a1": "st""r"})", EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << FIRST_OFFSET + 1 << ", json item was corrupted: TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc."); + CheckBatchError(R"({"a1": "x"} {"a1": "y"})", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 2 << " but got 2"); + CheckBatchError(R"({)", EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected 1 json rows from offset " << FIRST_OFFSET + 3 << " but got 0"); } } @@ -456,17 +458,17 @@ Y_UNIT_TEST_SUITE(TestRawParser) { Y_UNIT_TEST_F(TypeKindsValidation, TRawParserFixture) { CheckError( MakeParser({{"a1", "[DataType; String]"}, {"a2", "[DataType; String]"}}), - TStatus::EId::INTERNAL_ERROR, + EStatusId::INTERNAL_ERROR, "Expected only one column for raw format, but got 2" ); CheckError( MakeParser({{"a1", "[[BAD TYPE]]"}}), - TStatus::EId::INTERNAL_ERROR, + EStatusId::INTERNAL_ERROR, "Failed to create raw parser for column 'a1' : [[BAD TYPE]] subissue: {
: Error: Failed to parse type from yson: Failed to parse scheme from YSON:" ); CheckError( MakeParser({{"a1", "[ListType; [DataType; String]]"}}), - TStatus::EId::UNSUPPORTED, + EStatusId::UNSUPPORTED, "Failed to create raw parser for column 'a1' : [ListType; [DataType; String]] subissue: {
: Error: Unsupported type kind for raw format: List }" ); } diff --git a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp index a17237a501a6..c4e301fd7926 100644 --- a/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/purecalc_compilation/compile_service.cpp @@ -29,21 +29,20 @@ class TPurecalcCompileService : public NActors::TActor LOG_ROW_DISPATCHER_TRACE("Got compile request with id: " << ev->Cookie); IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder); - TStatus status; + TStatus status = TStatus::Success(); try { programHolder->CreateProgram(GetOrCreateFactory(ev->Get()->Settings)); } catch (const NYql::NPureCalc::TCompileError& error) { - status = TStatus(TStatus::EId::INTERNAL_ERROR) - .AddIssue(TStringBuilder() << "Compile issues: " << error.GetIssues()) + status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Compile issues: " << error.GetIssues()) .AddIssue(TStringBuilder() << "Final yql: " << error.GetYql()) .AddParentIssue(TStringBuilder() << "Failed to compile purecalc program"); } catch (...) { - status = TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to compile purecalc program, got unexpected exception: " << CurrentExceptionMessage()); + status = TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to compile purecalc program, got unexpected exception: " << CurrentExceptionMessage()); } - if (!status.IsSuccess()) { + if (status.IsFail()) { LOG_ROW_DISPATCHER_ERROR("Compilation failed for request with id: " << ev->Cookie); - Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetIssues()), 0, ev->Cookie); + Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(status.GetStatus(), status.GetErrorMessage()), 0, ev->Cookie); } else { LOG_ROW_DISPATCHER_TRACE("Compilation completed for request with id: " << ev->Cookie); Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index e5f5725b3827..872b954fe298 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -567,7 +567,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose const TString message = TStringBuilder() << "Read session to topic \"" << Self.TopicPathPartition << "\" was closed"; LOG_ROW_DISPATCHER_DEBUG(message << ": " << ev.DebugString()); - Self.FatalError(TStatus( + Self.FatalError(TStatus::Fail( NYql::NDq::YdbStatusToDqStatus(static_cast(ev.GetStatus())), ev.GetIssues() ).AddParentIssue(message)); @@ -707,7 +707,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(ActorContext(), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup)}).first; } - if (auto status = formatIt->second->AddClient(clientInfo)) { + if (auto status = formatIt->second->AddClient(clientInfo); status.IsFail()) { SendSessionError(clientInfo->ReadActorId, status); return; } @@ -741,7 +741,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) { } void TTopicSession::FatalError(TStatus status) { - LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.ToString()); + LOG_ROW_DISPATCHER_ERROR("FatalError: " << status.GetErrorString()); for (auto& [readActorId, info] : Clients) { LOG_ROW_DISPATCHER_DEBUG("Send TEvSessionError to " << readActorId); @@ -749,14 +749,14 @@ void TTopicSession::FatalError(TStatus status) { } StopReadSession(); Become(&TTopicSession::ErrorState); - ythrow yexception() << "FatalError: " << status.ToString(); // To exit from current stack and call once PassAway() in HandleException(). + ythrow yexception() << "FatalError: " << status.GetErrorString(); // To exit from current stack and call once PassAway() in HandleException(). } void TTopicSession::SendSessionError(TActorId readActorId, TStatus status) { - LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.ToString()); + LOG_ROW_DISPATCHER_WARN("SendSessionError to " << readActorId << ", status: " << status.GetErrorString()); auto event = std::make_unique(); event->Record.SetStatusCode(status.GetStatus()); - NYql::IssuesToMessage(status.GetIssues(), event->Record.MutableIssues()); + NYql::IssuesToMessage(status.GetErrorMessage(), event->Record.MutableIssues()); event->Record.SetPartitionId(PartitionId); event->ReadActorId = readActorId; Send(RowDispatcherActorId, event.release()); @@ -788,7 +788,7 @@ void TTopicSession::HandleException(const std::exception& e) { if (CurrentStateFunc() == &TThis::ErrorState) { return; } - FatalError(TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Session error, got unexpected exception: " << e.what())); + FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Session error, got unexpected exception: " << e.what())); } void TTopicSession::SendStatisticToRowDispatcher() { @@ -838,14 +838,14 @@ bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& auto it = Clients.find(ev->Sender); if (it != Clients.end()) { LOG_ROW_DISPATCHER_ERROR("Such a client already exists"); - SendSessionError(ev->Sender, TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists")); + SendSessionError(ev->Sender, TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Client with id " << ev->Sender << " already exists")); return false; } const auto& source = ev->Get()->Record.GetSource(); if (!Config.GetWithoutConsumer() && ConsumerName && ConsumerName != source.GetConsumerName()) { LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << source.GetConsumerName() << ", send error"); - SendSessionError(ev->Sender, TStatus(TStatus::EId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")")); + SendSessionError(ev->Sender, TStatus::Fail(EStatusId::PRECONDITION_FAILED, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")")); return false; } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 5892d8e13f1f..07c3ed2fb901 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -136,7 +136,7 @@ class TFixture : public NTests::TBaseFixture { CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected); } - void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatusCode statusCode, TString message = "") { + void ExpectSessionError(NActors::TActorId readActorId, TStatusCode statusCode, TString message = "") { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); @@ -144,7 +144,7 @@ class TFixture : public NTests::TBaseFixture { const auto& record = eventHolder->Get()->Record; NYql::TIssues issues; NYql::IssuesFromMessage(record.GetIssues(), issues); - NTests::CheckError(TStatus(record.GetStatusCode(), std::move(issues)), statusCode, message); + NTests::CheckError(TStatus::Fail(record.GetStatusCode(), std::move(issues)), statusCode, message); } void ExpectNewDataArrived(TSet readActorIds) { @@ -223,7 +223,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto source2 = BuildSource(topicName, false, "OtherConsumer"); StartSession(ReadActorId3, source2, Nothing(), true); - ExpectSessionError(ReadActorId3, TStatus::EId::PRECONDITION_FAILED, "Use the same consumer"); + ExpectSessionError(ReadActorId3, EStatusId::PRECONDITION_FAILED, "Use the same consumer"); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); @@ -332,7 +332,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { "not json", "noch einmal / nicht json" }; PQWrite(data, topicName); - ExpectSessionError(ReadActorId1, TStatus::EId::BAD_REQUEST, "INCORRECT_TYPE: The JSON element does not have the requested type."); + ExpectSessionError(ReadActorId1, EStatusId::BAD_REQUEST, "INCORRECT_TYPE: The JSON element does not have the requested type."); StopSession(ReadActorId1, source); } @@ -351,12 +351,12 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { PQWrite({ Json1 }, topicName); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(1) }); - ExpectSessionError(ReadActorId2, TStatus::EId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); + ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); PQWrite({ Json2 }, topicName); ExpectNewDataArrived({ReadActorId1}); ExpectMessageBatch(ReadActorId1, { JsonMessage(2) }); - ExpectSessionError(ReadActorId2, TStatus::EId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); + ExpectSessionError(ReadActorId2, EStatusId::PRECONDITION_FAILED, "Failed to parse json messages, found 1 missing values"); StopSession(ReadActorId1, source); StopSession(ReadActorId2, source); @@ -393,7 +393,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { Init(topicName); auto source = BuildSource(topicName); StartSession(ReadActorId1, source); - ExpectSessionError(ReadActorId1, TStatus::EId::SCHEME_ERROR, "no path"); + ExpectSessionError(ReadActorId1, EStatusId::SCHEME_ERROR, "no path"); StopSession(ReadActorId1, source); } @@ -510,7 +510,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { source2.AddColumns("field1"); source2.AddColumnTypes("[DataType; String]"); StartSession(ReadActorId2, source2, Nothing(), true); - ExpectSessionError(ReadActorId2, TStatus::EId::SCHEME_ERROR, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])"); + ExpectSessionError(ReadActorId2, EStatusId::SCHEME_ERROR, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])"); } } diff --git a/ydb/library/conclusion/generic/result.h b/ydb/library/conclusion/generic/result.h index b0d93d3a404d..02120c3a4a30 100644 --- a/ydb/library/conclusion/generic/result.h +++ b/ydb/library/conclusion/generic/result.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -55,19 +56,19 @@ class TConclusionImpl { const TResult& GetResult() const { auto result = std::get_if(&Result); - Y_ABORT_UNLESS(result, "incorrect object for result request"); + Y_ABORT_UNLESS(result, "incorrect object for result request: %s", GetErrorString().data()); return *result; } TResult& MutableResult() { auto result = std::get_if(&Result); - Y_ABORT_UNLESS(result, "incorrect object for result request"); + Y_ABORT_UNLESS(result, "incorrect object for result request: %s", GetErrorString().data()); return *result; } TResult&& DetachResult() { auto result = std::get_if(&Result); - Y_ABORT_UNLESS(result, "incorrect object for result request: %s", GetErrorMessage().data()); + Y_ABORT_UNLESS(result, "incorrect object for result request: %s", GetErrorString().data()); return std::move(*result); } @@ -91,10 +92,15 @@ class TConclusionImpl { return GetError(); } - const TString& GetErrorMessage() const { + TString GetErrorString() const { + auto* status = std::get_if(&Result); + return status ? status->GetErrorString() : Default(); + } + + const auto& GetErrorMessage() const { auto* status = std::get_if(&Result); if (!status) { - return Default(); + return TStatus::Success().GetErrorMessage(); } else { return status->GetErrorMessage(); } diff --git a/ydb/library/conclusion/generic/status.h b/ydb/library/conclusion/generic/status.h index 26be88712b50..fce80935726d 100644 --- a/ydb/library/conclusion/generic/status.h +++ b/ydb/library/conclusion/generic/status.h @@ -1,73 +1,60 @@ #pragma once +#include + #include +#include #include #include namespace NKikimr { -template -class TConclusionStatusImpl { -private: - std::optional ErrorMessage; +template +class TConclusionStatusGenericImpl { +protected: + std::optional ErrorMessage; TStatus Status = StatusOk; - TConclusionStatusImpl() = default; - TConclusionStatusImpl(const TString& errorMessage, TStatus status = DefaultError) - : ErrorMessage(errorMessage) - , Status(status) { - Y_ABORT_UNLESS(!!ErrorMessage); - } - TConclusionStatusImpl(const char* errorMessage, TStatus status = DefaultError) - : ErrorMessage(errorMessage) + TConclusionStatusGenericImpl() = default; + + TConclusionStatusGenericImpl(const TError& error, TStatus status = DefaultError) + : ErrorMessage(error) , Status(status) { Y_ABORT_UNLESS(!!ErrorMessage); } - TConclusionStatusImpl(const std::string& errorMessage, TStatus status = DefaultError) - : ErrorMessage(TString(errorMessage.data(), errorMessage.size())) + TConclusionStatusGenericImpl(TError&& error, TStatus status = DefaultError) + : ErrorMessage(std::move(error)) , Status(status) { Y_ABORT_UNLESS(!!ErrorMessage); } public: - void Validate(const TString& processInfo = Default()) const { - if (processInfo) { - Y_ABORT_UNLESS(Ok(), "error=%s, processInfo=%s", GetErrorMessage().c_str(), processInfo.c_str()); - } else { - Y_ABORT_UNLESS(Ok(), "error=%s", GetErrorMessage().c_str()); - } + virtual ~TConclusionStatusGenericImpl() = default; + +public: + [[nodiscard]] const TError& GetErrorMessage() const { + return ErrorMessage ? *ErrorMessage : Default(); } - [[nodiscard]] const TString& GetErrorMessage() const { - return ErrorMessage ? *ErrorMessage : Default(); + [[nodiscard]] virtual TString GetErrorString() const { + return ErrorMessage ? ToString(*ErrorMessage) : Default(); } [[nodiscard]] TStatus GetStatus() const { return Status; } - [[nodiscard]] static TConclusionStatusImpl Fail(const char* errorMessage) { - return TConclusionStatusImpl(errorMessage); + template + [[nodiscard]] static TDerived Fail(const TErrorMessage& errorMessage) { + return TDerived(errorMessage); } - [[nodiscard]] static TConclusionStatusImpl Fail(const TString& errorMessage) { - return TConclusionStatusImpl(errorMessage); - } - - [[nodiscard]] static TConclusionStatusImpl Fail(const std::string& errorMessage) { - return TConclusionStatusImpl(errorMessage); - } - - [[nodiscard]] static TConclusionStatusImpl Fail(const TStatus& status, const char* errorMessage) { + template + [[nodiscard]] static TDerived Fail(const TStatus& status, const TErrorMessage& errorMessage) { Y_ABORT_UNLESS(DefaultError == StatusOk || status != StatusOk); - return TConclusionStatusImpl(errorMessage, status); - } - - [[nodiscard]] static TConclusionStatusImpl Fail(const TStatus& status, const TString& errorMessage) { - Y_ABORT_UNLESS(DefaultError == StatusOk || status != StatusOk); - return TConclusionStatusImpl(errorMessage, status); + return TDerived(errorMessage, status); } [[nodiscard]] bool IsFail() const { @@ -86,8 +73,81 @@ class TConclusionStatusImpl { return !!ErrorMessage; } - [[nodiscard]] static TConclusionStatusImpl Success() { - return TConclusionStatusImpl(); + [[nodiscard]] static TDerived Success() { + return TDerived(); + } +}; + +template +class TConclusionStatusImpl : public TConclusionStatusGenericImpl> { +protected: + friend class TConclusionStatusGenericImpl>; + + using TBase = TConclusionStatusGenericImpl>; + using TBase::TBase; + + TConclusionStatusImpl() = default; + + TConclusionStatusImpl(const char* errorMessage, TStatus status = DefaultError) + : TBase(TString(errorMessage), status) { + } + + TConclusionStatusImpl(const std::string& errorMessage, TStatus status = DefaultError) + : TBase(TString(errorMessage), status) { + } + +public: + void Validate(const TString& processInfo = Default()) const { + if (processInfo) { + Y_ABORT_UNLESS(TBase::Ok(), "error=%s, processInfo=%s", TBase::GetErrorMessage().c_str(), processInfo.c_str()); + } else { + Y_ABORT_UNLESS(TBase::Ok(), "error=%s", TBase::GetErrorMessage().c_str()); + } + } +}; + +template +class TConclusionStatusIssueImpl : public TConclusionStatusGenericImpl> { +protected: + friend class TConclusionStatusGenericImpl>; + + using TBase = TConclusionStatusGenericImpl>; + using TBase::TBase; + + TConclusionStatusIssueImpl() = default; + + TConclusionStatusIssueImpl(const TString& errorMessage, TStatus status = DefaultError) + : TBase({NYql::TIssue(errorMessage)}, status) { + } + +public: + TConclusionStatusIssueImpl& AddParentIssue(NYql::TIssue issue) { + Y_ABORT_UNLESS(!!TBase::ErrorMessage); + for (const auto& childIssue : *TBase::ErrorMessage) { + issue.AddSubIssue(MakeIntrusive(childIssue)); + } + TBase::ErrorMessage = {std::move(issue)}; + return *this; + } + + TConclusionStatusIssueImpl& AddParentIssue(const TString& message) { + AddParentIssue(NYql::TIssue(message)); + return *this; + } + + TConclusionStatusIssueImpl& AddIssue(NYql::TIssue issue) { + Y_ABORT_UNLESS(!!TBase::ErrorMessage); + TBase::ErrorMessage->AddIssue(std::move(issue)); + return *this; + } + + TConclusionStatusIssueImpl& AddIssue(const TString& message) { + AddIssue(NYql::TIssue(message)); + return *this; + } + + [[nodiscard]] virtual TString GetErrorString() const override { + return TBase::GetErrorMessage().ToOneLineString(); } }; diff --git a/ydb/library/conclusion/generic/ya.make b/ydb/library/conclusion/generic/ya.make index 1e614b2bfb5c..5232905ff9ff 100644 --- a/ydb/library/conclusion/generic/ya.make +++ b/ydb/library/conclusion/generic/ya.make @@ -4,6 +4,8 @@ SRCS() PEERDIR( util + + yql/essentials/public/issue ) END()