Skip to content

Commit

Permalink
YQ-3560 RowDispatcher: local mode to use in dqrun (ydb-platform#10072)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Oct 8, 2024
1 parent 61ddf66 commit dc9f405
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "ydb/core/fq/libs/config/protos/storage.proto";
message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
bool LocalMode = 3; // Use only local row_dispatcher.
}
message TRowDispatcherConfig {
bool Enabled = 1;
Expand All @@ -19,5 +20,4 @@ message TRowDispatcherConfig {
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TRowDispatcherCoordinatorConfig Coordinator = 6;

}
1 change: 0 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ void Init(
if (protoConfig.GetRowDispatcher().GetEnabled()) {
auto rowDispatcher = NFq::NewRowDispatcherService(
protoConfig.GetRowDispatcher(),
protoConfig.GetCommon(),
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/leader_election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ void TLeaderElection::Bootstrap() {
Become(&TLeaderElection::StateFunc);
LogPrefix = "TLeaderElection " + SelfId().ToString() + " ";
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString());
if (Config.GetLocalMode()) {
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId));
return;
}
ProcessState();
}

Expand Down
6 changes: 0 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {


NConfig::TRowDispatcherConfig Config;
NConfig::TCommonConfig CommonConfig;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
Expand Down Expand Up @@ -171,7 +170,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
public:
explicit TRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand Down Expand Up @@ -234,15 +232,13 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

TRowDispatcher::TRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters)
: Config(config)
, CommonConfig(commonConfig)
, CredentialsProviderFactory(credentialsProviderFactory)
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
Expand Down Expand Up @@ -586,7 +582,6 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) {

std::unique_ptr<NActors::IActor> NewRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand All @@ -596,7 +591,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
{
return std::unique_ptr<NActors::IActor>(new TRowDispatcher(
config,
commonConfig,
credentialsProviderFactory,
yqSharedResources,
credentialsFactory,
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace NFq {

std::unique_ptr<NActors::IActor> NewRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ using namespace NActors;

std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand All @@ -20,7 +19,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
{
return NewRowDispatcher(
config,
commonConfig,
credentialsProviderFactory,
yqSharedResources,
credentialsFactory,
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace NFq {

std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const NConfig::TRowDispatcherConfig& config,
const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand Down
23 changes: 18 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ class TFixture : public NUnitTest::TBaseFixture {
Runtime.Initialize(app->Unwrap());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
YqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));

RowDispatcher = Runtime.AllocateEdgeActor();
Coordinator1 = Runtime.AllocateEdgeActor();
Coordinator2 = Runtime.AllocateEdgeActor();
Coordinator3 = Runtime.AllocateEdgeActor();
}

void Init(bool localMode = false) {
NConfig::TRowDispatcherCoordinatorConfig config;
config.SetCoordinationNodePath("row_dispatcher");
config.SetLocalMode(localMode);
auto& database = *config.MutableDatabase();
database.SetEndpoint(GetEnv("YDB_ENDPOINT"));
database.SetDatabase(GetEnv("YDB_DATABASE"));
Expand All @@ -42,7 +45,7 @@ class TFixture : public NUnitTest::TBaseFixture {
Coordinator1,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
Expand All @@ -52,7 +55,7 @@ class TFixture : public NUnitTest::TBaseFixture {
Coordinator2,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
Expand All @@ -62,7 +65,7 @@ class TFixture : public NUnitTest::TBaseFixture {
Coordinator3,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
Expand Down Expand Up @@ -95,10 +98,12 @@ class TFixture : public NUnitTest::TBaseFixture {
NActors::TActorId Coordinator2;
NActors::TActorId Coordinator3;
NActors::TActorId LeaderDetector;
TYqSharedResources::TPtr YqSharedResources;
};

Y_UNIT_TEST_SUITE(LeaderElectionTests) {
Y_UNIT_TEST_F(Test1, TFixture) {
Init();

auto coordinatorId1 = ExpectCoordinatorChanged();
auto coordinatorId2 = ExpectCoordinatorChanged();
Expand Down Expand Up @@ -134,7 +139,15 @@ Y_UNIT_TEST_SUITE(LeaderElectionTests) {
auto coordinatorId6 = ExpectCoordinatorChanged();
UNIT_ASSERT(coordinatorId6 != coordinatorId4);
}
}

Y_UNIT_TEST_F(TestLocalMode, TFixture) {
Init(true);
auto coordinatorId1 = ExpectCoordinatorChanged();
auto coordinatorId2 = ExpectCoordinatorChanged();
auto coordinatorId3 = ExpectCoordinatorChanged();
TSet<NActors::TActorId> set {coordinatorId1, coordinatorId2, coordinatorId3};
UNIT_ASSERT(set.size() == 3);
}
}

}
2 changes: 0 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class TFixture : public NUnitTest::TBaseFixture {
database.SetDatabase("YDB_DATABASE");
database.SetToken("");

NConfig::TCommonConfig commonConfig;
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));

Expand All @@ -74,7 +73,6 @@ class TFixture : public NUnitTest::TBaseFixture {

RowDispatcher = Runtime.Register(NewRowDispatcher(
config,
commonConfig,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TBlockingEQueue {

class TFileTopicReadSession : public NYdb::NTopic::IReadSession {

constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);

public:
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""):
Expand Down Expand Up @@ -182,10 +182,14 @@ constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
TString rawMsg;
TVector<TMessage> msgs;
size_t size = 0;
ui64 maxBatchRowSize = 100;

while (size_t read = fi.ReadLine(rawMsg)) {
msgs.emplace_back(MakeNextMessage(rawMsg));
MsgOffset_++;
if (!maxBatchRowSize--) {
break;
}
size += rawMsg.size();
}
if (!msgs.empty()) {
Expand Down

0 comments on commit dc9f405

Please sign in to comment.