Skip to content

Commit

Permalink
Fexed test error
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 26, 2024
1 parent 42238be commit 4fb6241
Show file tree
Hide file tree
Showing 16 changed files with 48 additions and 52 deletions.
10 changes: 6 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#pragma once

#include "format_handler/common/common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>

#include <util/generic/ptr.h>

namespace NFq::NRowDispatcher {

struct IActorFactory : public TThrRefBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ TStatus::TStatus()
: Status(EId::SUCCESS)
{}

TStatus::TStatus(TStatucCode status, NYql::TIssues issues)
TStatus::TStatus(TStatusCode status, NYql::TIssues issues)
: Status(status)
, Issues(std::move(issues))
{}

TStatus::TStatus(TStatucCode status, TString message)
TStatus::TStatus(TStatusCode status, TString message)
: Status(status)
, Issues({NYql::TIssue(std::move(message))})
{}
Expand All @@ -60,7 +60,7 @@ bool TStatus::IsSuccess() const {
return Status == EId::SUCCESS;
}

TStatus::TStatucCode TStatus::GetStatus() const {
TStatus::TStatusCode TStatus::GetStatus() const {
return Status;
}

Expand Down
10 changes: 5 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ struct TFormatHandlerException : public yexception {
class TStatus {
public:
using EId = NYql::NDqProto::StatusIds;
using TStatucCode = EId::StatusCode;
using TStatusCode = EId::StatusCode;

public:
TStatus();
explicit TStatus(TStatucCode status, NYql::TIssues issues = {});
TStatus(TStatucCode status, TString message);
explicit TStatus(TStatusCode status, NYql::TIssues issues = {});
TStatus(TStatusCode status, TString message);

virtual bool IsSuccess() const;

TStatucCode GetStatus() const;
TStatusCode GetStatus() const;
const NYql::TIssues& GetIssues() const;

TString ToString() const;
Expand All @@ -37,7 +37,7 @@ class TStatus {
TStatus& AddIssue(TString message);

protected:
TStatucCode Status;
TStatusCode Status;
NYql::TIssues Issues;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "format_handler.h"

#include "parsers/raw_parser.h"

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h>

#include <ydb/library/yql/dq/common/rope_over_buffer.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "filters/filters_set.h"
#include "parsers/json_parser.h"
#include <ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h>

#include <ydb/library/actors/util/rope.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void CheckSuccess(const TStatus& status) {
UNIT_ASSERT_C(status.IsSuccess(), TStringBuilder() << "Status is not success, " << status.ToString());
}

void CheckError(const TStatus& status, TStatus::TStatucCode expectedStatusCode, const TString& expectedMessage) {
void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage) {
UNIT_ASSERT_C(status.GetStatus() == expectedStatusCode, TStringBuilder() << "Expected error status " << NYql::NDqProto::StatusIds_StatusCode_Name(expectedStatusCode) << ", but got: " << status.ToString());
UNIT_ASSERT_STRING_CONTAINS_C(status.GetIssues().ToOneLineString(), expectedMessage, TStringBuilder() << "Unexpected error message, Status: " << NYql::NDqProto::StatusIds_StatusCode_Name(status.GetStatus()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TBaseFixture : public NUnitTest::TBaseFixture {
};

void CheckSuccess(const TStatus& status);
void CheckError(const TStatus& status, TStatus::TStatucCode expectedStatusCode, const TString& expectedMessage);
void CheckError(const TStatus& status, TStatus::TStatusCode expectedStatusCode, const TString& expectedMessage);

template <typename TValue>
TValue CheckSuccess(TValueStatus<TValue> valueStatus) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "common/ut_common.h"

#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h>

namespace NFq::NRowDispatcher::NTests {

Expand Down Expand Up @@ -37,7 +36,7 @@ class TFormatHadlerFixture : public TBaseFixture {
}
}

void ExpectError(TStatus::TStatucCode statusCode, const TString& message) {
void ExpectError(TStatus::TStatusCode statusCode, const TString& message) {
UNIT_ASSERT_C(!ExpectedError, "Can not add existing error, client id: " << ClientId);
ExpectedError = {statusCode, message};
}
Expand Down Expand Up @@ -100,7 +99,7 @@ class TFormatHadlerFixture : public TBaseFixture {
bool Frozen = false;
ui64 ExpectedFilteredRows = 0;
std::queue<ui64> Offsets;
std::optional<std::pair<TStatus::TStatucCode, TString>> ExpectedError;
std::optional<std::pair<TStatus::TStatusCode, TString>> ExpectedError;
};

public:
Expand Down Expand Up @@ -143,7 +142,7 @@ class TFormatHadlerFixture : public TBaseFixture {
FormatHandler->ParseMessages(messages);
}

void CheckClientError(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages, NActors::TActorId clientId, TStatus::TStatucCode statusCode, const TString& message) {
void CheckClientError(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages, NActors::TActorId clientId, TStatus::TStatusCode statusCode, const TString& message) {
for (auto& client : Clients) {
client->ExpectOffsets({messages.back().GetOffset()});
if (client->GetClientId() == clientId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "common/ut_common.h"

#include <ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/parser_base.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h>

#include <yql/essentials/minikql/mkql_string_util.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "common/ut_common.h"

#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/parsers/raw_parser.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h>

namespace NFq::NRowDispatcher::NTests {

Expand All @@ -24,11 +23,11 @@ class TBaseParserFixture : public TBaseFixture {
, Callback(callback)
{}

void ExpectColumnError(ui64 columnId, TStatus::TStatucCode statusCode, const TString& message) {
void ExpectColumnError(ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) {
UNIT_ASSERT_C(ExpectedErrors.insert({columnId, {statusCode, message}}).second, "Can not add existing column error");
}

void ExpectCommonError(TStatus::TStatucCode statusCode, const TString& message) {
void ExpectCommonError(TStatus::TStatusCode statusCode, const TString& message) {
UNIT_ASSERT_C(!ExpectedCommonError, "Can not add existing common error");
ExpectedCommonError = {statusCode, message};
}
Expand Down Expand Up @@ -81,8 +80,8 @@ class TBaseParserFixture : public TBaseFixture {
const TVector<TSchemaColumn> Columns;
const TCallback Callback;

std::optional<std::pair<TStatus::TStatucCode, TString>> ExpectedCommonError;
std::unordered_map<ui64, std::pair<TStatus::TStatucCode, TString>> ExpectedErrors;
std::optional<std::pair<TStatus::TStatusCode, TString>> ExpectedCommonError;
std::unordered_map<ui64, std::pair<TStatus::TStatusCode, TString>> ExpectedErrors;
};

public:
Expand Down Expand Up @@ -126,13 +125,13 @@ class TBaseParserFixture : public TBaseFixture {
Parser->ParseMessages({GetMessage(offset, data)});
}

void CheckColumnError(const TString& data, ui64 columnId, TStatus::TStatucCode statusCode, const TString& message) {
void CheckColumnError(const TString& data, ui64 columnId, TStatus::TStatusCode statusCode, const TString& message) {
ExpectedBatches++;
ParserHandler->ExpectColumnError(columnId, statusCode, message);
Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)});
}

void CheckBatchError(const TString& data, TStatus::TStatucCode statusCode, const TString& message) {
void CheckBatchError(const TString& data, TStatus::TStatusCode statusCode, const TString& message) {
ExpectedBatches++;
ParserHandler->ExpectCommonError(statusCode, message);
Parser->ParseMessages({GetMessage(ParserHandler->CurrentOffset, data)});
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "row_dispatcher.h"
#include "format_handler/common/common.h"

#include "actors_factory.h"
#include "coordinator.h"
#include "leader_election.h"

#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand All @@ -15,9 +17,8 @@
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/mon/mon.h>

#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/row_dispatcher/leader_election.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>
#include <ydb/core/fq/libs/row_dispatcher/protos/events.pb.h>

#include <util/generic/queue.h>
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#pragma once

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include "events/data_plane.h"

#include <ydb/library/actors/core/actor.h>

#include <memory>

Expand Down
11 changes: 6 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#include "topic_session.h"

#include "format_handler/format_handler.h"

#include <ydb/core/fq/libs/actors/logging/log.h>

#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/dq/runtime/dq_async_stats.h>

#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

#include <yql/essentials/public/issue/yql_issue_message.h>
Expand Down Expand Up @@ -302,6 +302,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
IgnoreFunc(NFq::TEvRowDispatcher::TEvStartSession);
IgnoreFunc(NFq::TEvRowDispatcher::TEvStopSession);
IgnoreFunc(NFq::TEvPrivate::TEvSendStatisticToRowDispatcher);
IgnoreFunc(NFq::TEvPrivate::TEvRefreshFormatHandlers);
})
};

Expand Down
7 changes: 2 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#pragma once

#include "format_handler/common/common.h"

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

#include <ydb/library/actors/core/actor.h>

#include <memory>

namespace NFq {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class TFixture : public NTests::TBaseFixture {
}
}

void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatucCode statusCode, TString message = "") {
void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatusCode statusCode, TString message = "") {
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvSessionError>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId);
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_scheme_error(self, kikimr, client):

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
issues = str(client.describe_query(query_id).result.query.issue)
assert "Cannot parse JSON string" in issues, "Incorrect Issues: " + issues
assert "Failed to parse json message for offset" in issues, "Incorrect Issues: " + issues

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 0)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
Expand Down

0 comments on commit 4fb6241

Please sign in to comment.