diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index e36172a85b32..5aadf813a94d 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -1,13 +1,15 @@ #pragma once -#include "format_handler/common/common.h" - #include -#include +#include + #include -#include #include +#include + +#include + namespace NFq::NRowDispatcher { struct IActorFactory : public TThrRefBase { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp index 1431fd0bbcfb..682ee4ebb196 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.cpp @@ -46,12 +46,12 @@ TStatus::TStatus() : Status(EId::SUCCESS) {} -TStatus::TStatus(TStatucCode status, NYql::TIssues issues) +TStatus::TStatus(TStatusCode status, NYql::TIssues issues) : Status(status) , Issues(std::move(issues)) {} -TStatus::TStatus(TStatucCode status, TString message) +TStatus::TStatus(TStatusCode status, TString message) : Status(status) , Issues({NYql::TIssue(std::move(message))}) {} @@ -60,7 +60,7 @@ bool TStatus::IsSuccess() const { return Status == EId::SUCCESS; } -TStatus::TStatucCode TStatus::GetStatus() const { +TStatus::TStatusCode TStatus::GetStatus() const { return Status; } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h index 8859320b34a7..6e3016cb6e8b 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h @@ -16,16 +16,16 @@ struct TFormatHandlerException : public yexception { class TStatus { public: using EId = NYql::NDqProto::StatusIds; - using TStatucCode = EId::StatusCode; + using TStatusCode = EId::StatusCode; public: TStatus(); - explicit TStatus(TStatucCode status, NYql::TIssues issues = {}); - TStatus(TStatucCode status, TString message); + explicit TStatus(TStatusCode status, NYql::TIssues issues = {}); + TStatus(TStatusCode status, TString message); virtual bool IsSuccess() const; - TStatucCode GetStatus() const; + TStatusCode GetStatus() const; const NYql::TIssues& GetIssues() const; TString ToString() const; @@ -37,7 +37,7 @@ class TStatus { TStatus& AddIssue(TString message); protected: - TStatucCode Status; + TStatusCode Status; NYql::TIssues Issues; }; diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index a195adc7ae10..1cc55cb0ce6c 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -1,8 +1,7 @@ #include "format_handler.h" -#include "parsers/raw_parser.h" - #include +#include #include diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index c19c1b000634..264f4b219b79 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -1,7 +1,7 @@ #pragma once -#include "filters/filters_set.h" -#include "parsers/json_parser.h" +#include +#include #include diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp index cbbd0b96d06e..3561bb20a37e 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp @@ -90,7 +90,7 @@ void CheckSuccess(const TStatus& status) { UNIT_ASSERT_C(status.IsSuccess(), TStringBuilder() << "Status is not success, " << status.ToString()); } -void CheckError(const TStatus& status, TStatus::TStatucCode expectedStatusCode, const TString& expectedMessage) { +void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage) { UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, TStringBuilder() << "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.ToString()); UNIT_ASSERT_STRING_CONTAINS_C(status.GetIssues().ToOneLineString(), expectedMessage, TStringBuilder() << "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus())); } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h index 3d2d9226e196..e63f8ed42024 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h @@ -42,7 +42,7 @@ class TBaseFixture : public NUnitTest::TBaseFixture { }; void CheckSuccess(const TStatus& status); -void CheckError(const TStatus& status, TStatus::TStatucCode expectedStatusCode, const TString& expectedMessage); +void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage); template TValue CheckSuccess(TValueStatus valueStatus) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp index cfcc94ab7235..be949576f94f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp @@ -1,6 +1,5 @@ -#include "common/ut_common.h" - #include +#include namespace NFq::NRowDispatcher::NTests { @@ -37,7 +36,7 @@ class TFormatHadlerFixture : public TBaseFixture { } } - void ExpectError(TStatus::TStatucCode statusCode, const TString& message) { + void ExpectError(TStatus::TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(!ExpectedError, "Can not add existing error, client id: " << ClientId); ExpectedError = {statusCode, message}; } @@ -100,7 +99,7 @@ class TFormatHadlerFixture : public TBaseFixture { bool Frozen = false; ui64 ExpectedFilteredRows = 0; std::queue Offsets; - std::optional> ExpectedError; + std::optional> ExpectedError; }; public: @@ -143,7 +142,7 @@ class TFormatHadlerFixture : public TBaseFixture { FormatHandler->ParseMessages(messages); } - void CheckClientError(const TVector& messages, NActors::TActorId clientId, TStatus::TStatucCode statusCode, const TString& message) { + void CheckClientError(const TVector& messages, NActors::TActorId clientId, TStatus::TStatusCode statusCode, const TString& message) { for (auto& client : Clients) { client->ExpectOffsets({messages.back().GetOffset()}); if (client->GetClientId() == clientId) { diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp index 8b8ca36aecd2..ee089dc56599 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp @@ -1,7 +1,6 @@ -#include "common/ut_common.h" - #include #include +#include #include diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp index c657c74f5a42..2af179268891 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_parser_ut.cpp @@ -1,7 +1,6 @@ -#include "common/ut_common.h" - #include #include +#include namespace NFq::NRowDispatcher::NTests { @@ -24,11 +23,11 @@ class TBaseParserFixture : public TBaseFixture { , Callback(callback) {} - void ExpectColumnError(ui64 columnId, TStatus::TStatucCode statusCode, const TString& message) { + void ExpectColumnError(ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(ExpectedErrors.insert({columnId, {statusCode, message}}).second, "Can not add existing column error"); } - void ExpectCommonError(TStatus::TStatucCode statusCode, const TString& message) { + void ExpectCommonError(TStatus::TStatusCode statusCode, const TString& message) { UNIT_ASSERT_C(!ExpectedCommonError, "Can not add existing common error"); ExpectedCommonError = {statusCode, message}; } @@ -81,8 +80,8 @@ class TBaseParserFixture : public TBaseFixture { const TVector Columns; const TCallback Callback; - std::optional> ExpectedCommonError; - std::unordered_map> ExpectedErrors; + std::optional> ExpectedCommonError; + std::unordered_map> ExpectedErrors; }; public: @@ -126,13 +125,13 @@ class TBaseParserFixture : public TBaseFixture { Parser->ParseMessages({GetMessage(offset, data)}); } - void CheckColumnError(const TString& data, ui64 columnId, TStatus::TStatucCode statusCode, const TString& message) { + void CheckColumnError(const TString& data, ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) { ExpectedBatches++; ParserHandler->ExpectColumnError(columnId, statusCode, message); Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)}); } - void CheckBatchError(const TString& data, TStatus::TStatucCode statusCode, const TString& message) { + void CheckBatchError(const TString& data, TStatus::TStatusCode statusCode, const TString& message) { ExpectedBatches++; ParserHandler->ExpectCommonError(statusCode, message); Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)}); diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index b6d939c7214b..e031324880be 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -1,6 +1,8 @@ #include "row_dispatcher.h" -#include "format_handler/common/common.h" + +#include "actors_factory.h" #include "coordinator.h" +#include "leader_election.h" #include #include @@ -15,9 +17,8 @@ #include #include -#include #include -#include +#include #include #include diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index 1996526bd70a..d57bf01efdf3 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -1,15 +1,14 @@ #pragma once -#include #include +#include +#include #include +#include #include #include #include -#include "events/data_plane.h" - -#include #include diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 3d72a1043207..04d9081b8523 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -1,15 +1,15 @@ #include "topic_session.h" -#include "format_handler/format_handler.h" - #include - #include -#include -#include +#include + #include #include +#include +#include #include + #include #include @@ -304,6 +304,7 @@ class TTopicSession : public TActorBootstrapped { IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession); IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession); IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher); + IgnoreFunc(NFq::TEvPrivate::TEvRefreshFormatHandlers); }) }; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 4a1c5ade5b74..a8aeca5d9017 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -1,18 +1,15 @@ #pragma once -#include "format_handler/common/common.h" - #include #include +#include #include +#include #include - #include #include -#include - #include namespace NFq { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 1497c6723736..227e0b54af2a 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -164,7 +164,7 @@ class TFixture : public NTests::TBaseFixture { } } - void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatucCode statusCode, TString message = "") { + void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatusCode statusCode, TString message = "") { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 94e31a4f7983..5e617b633b88 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -216,7 +216,7 @@ def test_scheme_error(self, kikimr, client): client.wait_query_status(query_id, fq.QueryMeta.FAILED) issues = str(client.describe_query(query_id).result.query.issue) - assert "Cannot parse JSON string" in issues, "Incorrect Issues: " + issues + assert "Failed to parse json message for offset" in issues, "Incorrect Issues: " + issues wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 0) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)