Skip to content

Commit

Permalink
Fixed issues
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 2, 2024
1 parent c3f01ed commit 04db658
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 37 deletions.
76 changes: 43 additions & 33 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
NFq::NRowDispatcherProto::TEvStartSession Settings;
NActors::TActorId ReadActorId;
std::unique_ptr<TJsonFilter> Filter; // empty if no predicate
ui64 InFlightCompilationId = 0;
TQueue<std::pair<ui64, TString>> Buffer;
ui64 UnreadBytes = 0;
bool DataArrivedSent = false;
Expand Down Expand Up @@ -259,6 +260,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void HandleNewEvents();
TInstant GetMinStartingMessageTimestamp() const;
void AddDataToClient(TClientsInfo& client, ui64 offset, const TString& json);
void StartClientSession(TClientsInfo& info);

std::pair<NYql::NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message);

Expand Down Expand Up @@ -369,11 +371,6 @@ void TTopicSession::SubscribeOnNextEvent() {
return;
}

if (!FiltersCompilation.InFlightCompilations.empty()) {
LOG_ROW_DISPATCHER_TRACE("In flight filters compilation: " << FiltersCompilation.InFlightCompilations.size() << ", skip yds event handling");
return;
}

if (Config.GetMaxSessionUsedMemory() && UnreadBytes > Config.GetMaxSessionUsedMemory()) {
LOG_ROW_DISPATCHER_TRACE("Too much used memory (" << UnreadBytes << " bytes), skip subscribing to WaitEvent()");
return;
Expand Down Expand Up @@ -556,11 +553,6 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
void TTopicSession::HandleNewEvents() {
ui64 handledEventsSize = 0;

if (!FiltersCompilation.InFlightCompilations.empty()) {
LOG_ROW_DISPATCHER_TRACE("In flight filters compilation: " << FiltersCompilation.InFlightCompilations.size() << ", skip yds event handling");
return;
}

for (ui64 i = 0; i < MaxHandledEventsCount; ++i) {
if (!ReadSession) {
return;
Expand Down Expand Up @@ -711,6 +703,9 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector<
LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << lastOffset);

for (auto& [actorId, info] : Clients) {
if (info.InFlightCompilationId) { // filter compilation in flight
continue;
}
if (info.NextMessageOffset && lastOffset < info.NextMessageOffset) { // the batch has already been processed
continue;
}
Expand Down Expand Up @@ -798,6 +793,28 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
return false;
}

void TTopicSession::StartClientSession(TClientsInfo& info) {
if (ReadSession) {
if (info.Settings.HasOffset() && info.Settings.GetOffset() <= LastMessageOffset) {
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << info.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
info.RestartSessionByOffsetsByQuery->Inc();
StopReadSession();
}
}

if (Parser) {
// Parse remains data before changing parsing schema
DoParsing(true);
}
UpdateParser();

if (!ReadSession) {
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
}
}

void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: "
<< ev->Get()->Record.GetSource().GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
Expand All @@ -810,10 +827,6 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
auto types = GetVector(ev->Get()->Record.GetSource().GetColumnTypes());

try {
if (Parser) {
// Parse remains data before adding new client
DoParsing(true);
}
auto queryGroup = Counters->GetSubgroup("queryId", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(TopicPath));
auto& clientInfo = Clients.emplace(
Expand Down Expand Up @@ -841,34 +854,23 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
{.EnabledLLVM = source.GetEnabledLLVM()}
);

const ui64 eventId = FiltersCompilation.FreeId++;
Y_ENSURE(FiltersCompilation.InFlightCompilations.emplace(eventId, ev->Sender).second, "Got duplicated compilation event id");
LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << eventId);
clientInfo.InFlightCompilationId = ++FiltersCompilation.FreeId;
Y_ENSURE(FiltersCompilation.InFlightCompilations.emplace(clientInfo.InFlightCompilationId, ev->Sender).second, "Got duplicated compilation event id");
LOG_ROW_DISPATCHER_TRACE("Send compile request with id " << clientInfo.InFlightCompilationId);

Send(CompileServiceActorId, clientInfo.Filter->GetCompileRequest().release(), 0, eventId);
Send(CompileServiceActorId, clientInfo.Filter->GetCompileRequest().release(), 0, clientInfo.InFlightCompilationId);
Metrics.InFlightCompileRequests->Inc();
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}

if (ReadSession) {
if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) {
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
Metrics.RestartSessionByOffsets->Inc();
++RestartSessionByOffsets;
clientInfo.RestartSessionByOffsetsByQuery->Inc();
StopReadSession();
}
// In case of in flight compilation topic session will be checked after getting compile response
StartClientSession(clientInfo);
}
} catch (...) {
FatalError("Adding new client failed, got unexpected exception: " + CurrentExceptionMessage(), nullptr, true, Nothing());
}
ConsumerName = ev->Get()->Record.GetSource().GetConsumerName();
UpdateParser();
SendStatisticToRowDispatcher();
if (!ReadSession) {
Schedule(TDuration::Seconds(Config.GetTimeoutBeforeStartSessionSec()), new NFq::TEvPrivate::TEvCreateSession());
}
}

void TTopicSession::AddDataToClient(TClientsInfo& info, ui64 offset, const TString& json) {
Expand Down Expand Up @@ -1117,9 +1119,17 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& e
LOG_ROW_DISPATCHER_TRACE("Compile response ignored for id " << ev->Cookie << ", client with id " << clientId << " not found");
return;
}
if (const auto& filter = clientIt->second.Filter) {
filter->OnCompileResponse(std::move(ev));

auto& clientInfo = clientIt->second;
if (ev->Cookie != clientInfo.InFlightCompilationId) {
LOG_ROW_DISPATCHER_TRACE("Outdated compiler response ignored for id " << ev->Cookie << ", client with id " << clientId << " changed");
return;
}

Y_ENSURE(clientInfo.Filter, "Unexpected completion response for client without filter");
clientInfo.Filter->OnCompileResponse(std::move(ev));
clientInfo.InFlightCompilationId = 0;
StartClientSession(clientInfo);
}

TString TTopicSession::GetAnyQueryIdByFieldName(const TString& fieldName) {
Expand Down
51 changes: 47 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>

#include <yql/essentials/public/purecalc/common/interface.h>

namespace {

using namespace NKikimr;
Expand All @@ -24,6 +26,38 @@ using namespace NYql::NDq;
const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;

class TPurecalcCompileServiceMock : public NActors::TActor<TPurecalcCompileServiceMock> {
using TBase = NActors::TActor<TPurecalcCompileServiceMock>;

public:
TPurecalcCompileServiceMock(TActorId owner)
: TBase(&TPurecalcCompileServiceMock::StateFunc)
, Owner(owner)
, ProgramFactory(NYql::NPureCalc::MakeProgramFactory())
{}

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
)

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);

try {
programHolder->CreateProgram(ProgramFactory);
} catch (const NYql::NPureCalc::TCompileError& e) {
UNIT_ASSERT_C(false, "Failed to compile purecalc filter: sql: " << e.GetYql() << ", error: " << e.GetIssues());
}

Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
Send(Owner, new NActors::TEvents::TEvPing());
}

private:
const TActorId Owner;
const NYql::NPureCalc::IProgramFactoryPtr ProgramFactory;
};

class TFixture : public NUnitTest::TBaseFixture {
public:
TFixture()
Expand Down Expand Up @@ -60,7 +94,8 @@ class TFixture : public NUnitTest::TBaseFixture {
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);

const auto compileServiceActorId = Runtime.Register(NRowDispatcher::CreatePurecalcCompileService());
CompileNotifier = Runtime.AllocateEdgeActor();
const auto compileServiceActorId = Runtime.Register(new TPurecalcCompileServiceMock(CompileNotifier));

TopicSession = Runtime.Register(NewTopicSession(
topicPath,
Expand All @@ -86,7 +121,7 @@ class TFixture : public NUnitTest::TBaseFixture {
void TearDown(NUnitTest::TTestContext& /* context */) override {
}

void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe<ui64> readOffset = Nothing()) {
void StartSession(TActorId readActorId, const NYql::NPq::NProto::TDqPqTopicSource& source, TMaybe<ui64> readOffset = Nothing(), bool expectedError = false) {
auto event = new NFq::TEvRowDispatcher::TEvStartSession(
source,
PartitionId,
Expand All @@ -95,6 +130,13 @@ class TFixture : public NUnitTest::TBaseFixture {
0, // StartingMessageTimestamp;
"QueryId");
Runtime.Send(new IEventHandle(TopicSession, readActorId, event));

const auto& predicate = source.GetPredicate();
if (predicate && !expectedError) {
// Wait predicate compilation
const auto ping = Runtime.GrabEdgeEvent<NActors::TEvents::TEvPing>(CompileNotifier);
UNIT_ASSERT_C(ping, "Compilation is not performed for predicate: " << predicate);
}
}

NYql::NPq::NProto::TDqPqTopicSource BuildSource(TString topic, bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) {
Expand Down Expand Up @@ -177,6 +219,7 @@ class TFixture : public NUnitTest::TBaseFixture {
TActorSystemStub ActorSystemStub;
NActors::TActorId TopicSession;
NActors::TActorId RowDispatcherActorId;
NActors::TActorId CompileNotifier;
NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NActors::TActorId ReadActorId1;
Expand Down Expand Up @@ -208,7 +251,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2});

auto source2 = BuildSource(topicName, false, "OtherConsumer");
StartSession(ReadActorId3, source2);
StartSession(ReadActorId3, source2, Nothing(), true);
ExpectSessionError(ReadActorId3, "Use the same consumer");

StopSession(ReadActorId1, source);
Expand Down Expand Up @@ -484,7 +527,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
auto source2 = BuildSource(topicName);
source2.AddColumns("field1");
source2.AddColumnTypes("[DataType; String]");
StartSession(ReadActorId2, source2);
StartSession(ReadActorId2, source2, Nothing(), true);
ExpectSessionError(ReadActorId2, "Use the same column type in all queries via RD, current type for column `field1` is [OptionalType; [DataType; String]] (requested type is [DataType; String])");
}
}
Expand Down

0 comments on commit 04db658

Please sign in to comment.