Skip to content

Commit

Permalink
Fixed topic_session_ut.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent 24a482b commit 7ca3482
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 99 deletions.
14 changes: 11 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnFilteredData(ui64 rowId) override {
const ui64 offset = Self.Offsets->at(rowId);
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {
LOG_ROW_DISPATCHER_TRACE("OnFilteredData, skip historical offset: " << offset << ", next message offset: " << *nextOffset);
return;
}

Y_DEFER {
// Values allocated on parser allocator and should be released
FilteredRow.assign(Columns.size(), NYql::NUdf::TUnboxedValue());
Expand All @@ -218,8 +224,6 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId);
}
DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size());

const ui64 offset = Self.Offsets->at(rowId);
FilteredOffsets.emplace_back(offset);

const ui64 newPackerSize = DataPacker->PackedSizeEstimate();
Expand Down Expand Up @@ -555,7 +559,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
//// ITopicFormatHandler::TDestroy

void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) {
NActors::TActivationContext::ActorSystem()->Send(handler->GetSelfId(), new NActors::TEvents::TEvPoison());
if (NActors::TlsActivationContext) {
NActors::TActivationContext::ActorSystem()->Send(handler->GetSelfId(), new NActors::TEvents::TEvPoison());
} else {
// Destroy from not AS thread my be caused only in case AS destruction (so handler will be deleted)
}
}

ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,28 @@ void SegmentationFaultHandler(int) {

//// TBaseFixture::ICell

class TOptionalCell : public TBaseFixture::ICell {
public:
TOptionalCell(ICell::TPtr value)
: Value(value)
{}

public:
TString GetType() const override {
return TStringBuilder() << "[OptionalType; " << Value->GetType() << "]";
}

void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override {
if (!parsedValue) {
return;
}
Value->Validate(parsedValue.GetOptionalValue());
}

private:
const ICell::TPtr Value;
};

class TStringSell : public TBaseFixture::ICell {
public:
TStringSell(const TString& value)
Expand Down Expand Up @@ -97,21 +119,29 @@ class TUint64Sell : public TBaseFixture::ICell {

//// TBaseFixture::TRow

TBaseFixture::TRow& TBaseFixture::TRow::AddCell(ICell::TPtr cell) {
TBaseFixture::TRow& TBaseFixture::TRow::AddCell(ICell::TPtr cell, bool optional) {
if (optional) {
cell = MakeIntrusive<TOptionalCell>(cell);
}

Cells.emplace_back(cell);
return *this;
}

TBaseFixture::TRow& TBaseFixture::TRow::AddString(const TString& value) {
return AddCell(MakeIntrusive<TStringSell>(value));
TBaseFixture::TRow& TBaseFixture::TRow::AddString(const TString& value, bool optional) {
return AddCell(MakeIntrusive<TStringSell>(value), optional);
}

TBaseFixture::TRow& TBaseFixture::TRow::AddUint64(ui64 value) {
return AddCell(MakeIntrusive<TUint64Sell>(value));
TBaseFixture::TRow& TBaseFixture::TRow::AddUint64(ui64 value, bool optional) {
return AddCell(MakeIntrusive<TUint64Sell>(value), optional);
}

//// TBaseFixture::TBatch

TBaseFixture::TBatch::TBatch(std::initializer_list<TRow> rows)
: Rows(rows)
{}

TBaseFixture::TBatch& TBaseFixture::TBatch::AddRow(TRow row) {
Rows.emplace_back(row);
return *this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
public:
TRow() = default;

TRow& AddCell(ICell::TPtr cell);
TRow& AddString(const TString& value);
TRow& AddUint64(ui64 value);
TRow& AddCell(ICell::TPtr cell, bool optional);

TRow& AddString(const TString& value, bool optional = false);
TRow& AddUint64(ui64 value, bool optional = false);

public:
TVector<ICell::TPtr> Cells;
Expand All @@ -37,6 +38,7 @@ class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
class TBatch {
public:
TBatch() = default;
TBatch(std::initializer_list<TRow> rows);

TBatch& AddRow(TRow row);

Expand Down
6 changes: 2 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
}

void AddDataToClient(ui64 offset, ui64 rowSize) override {
if (NextMessageOffset && offset < *NextMessageOffset) {
LOG_ROW_DISPATCHER_TRACE("AddDataToClient " << ReadActorId << " skip historical offset: " << offset << ", next message offset: " << *NextMessageOffset);
return;
}
Y_ENSURE(!NextMessageOffset || offset >= *NextMessageOffset, "Unexpected historical offset");

LOG_ROW_DISPATCHER_TRACE("AddDataToClient to " << ReadActorId << ", offset: " << offset << ", serialized size: " << rowSize);

Expand Down Expand Up @@ -363,6 +360,7 @@ void TTopicSession::Bootstrap() {
void TTopicSession::PassAway() {
LOG_ROW_DISPATCHER_DEBUG("PassAway");
StopReadSession();
FormatHandlers.clear();
TBase::PassAway();
}

Expand Down
103 changes: 19 additions & 84 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,10 @@ using namespace NYql::NDq;
const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;

class TPurecalcCompileServiceMock : public NActors::TActor<TPurecalcCompileServiceMock> {
using TBase = NActors::TActor<TPurecalcCompileServiceMock>;

public:
TPurecalcCompileServiceMock(TActorId owner)
: TBase(&TPurecalcCompileServiceMock::StateFunc)
, Owner(owner)
, ProgramFactory(NYql::NPureCalc::MakeProgramFactory())
{}

STRICT_STFUNC(StateFunc,
hFunc(TEvRowDispatcher::TEvPurecalcCompileRequest, Handle);
)

void Handle(TEvRowDispatcher::TEvPurecalcCompileRequest::TPtr& ev) {
IProgramHolder::TPtr programHolder = std::move(ev->Get()->ProgramHolder);

try {
programHolder->CreateProgram(ProgramFactory);
} catch (const NYql::NPureCalc::TCompileError& e) {
UNIT_ASSERT_C(false, "Failed to compile purecalc filter: sql: " << e.GetYql() << ", error: " << e.GetIssues());
}

Send(ev->Sender, new TEvRowDispatcher::TEvPurecalcCompileResponse(std::move(programHolder)), 0, ev->Cookie);
Send(Owner, new NActors::TEvents::TEvPing());
}

private:
const TActorId Owner;
const NYql::NPureCalc::IProgramFactoryPtr ProgramFactory;
};

class TFixture : public NTests::TBaseFixture {
public:
using TBase = NTests::TBaseFixture;

struct TExpectedCell {
enum class EMessageType {
String,
Int
};

EMessageType Type;

ui64 ExperctedInt = 0;
TString ExperctedString = "";
};

struct TExpectedRow {
TVector<TExpectedCell> ExpectedCells;

TExpectedRow& AddString(const TString& expectedString) {
ExpectedCells.push_back(TExpectedCell{.Type = TExpectedCell::EMessageType::String, .ExperctedString = expectedString});
return *this;
}

TExpectedRow& AddInt(ui64 expectedInt) {
ExpectedCells.push_back(TExpectedCell{.Type = TExpectedCell::EMessageType::Int, .ExperctedInt = expectedInt});
return *this;
}
};

public:
void SetUp(NUnitTest::TTestContext& ctx) override {
TBase::SetUp(ctx);
Expand Down Expand Up @@ -115,7 +57,7 @@ class TFixture : public NTests::TBaseFixture {
nullptr);

CompileNotifier = Runtime.AllocateEdgeActor();
const auto compileServiceActorId = Runtime.Register(new TPurecalcCompileServiceMock(CompileNotifier));
const auto compileServiceActorId = Runtime.Register(CreatePurecalcCompileServiceMock(CompileNotifier));

TopicSession = Runtime.Register(NewTopicSession(
topicPath,
Expand Down Expand Up @@ -181,30 +123,17 @@ class TFixture : public NTests::TBaseFixture {
Runtime.Send(new IEventHandle(TopicSession, readActorId, event.release()));
}

void ExpectMessageBatch(NActors::TActorId readActorId, const std::vector<TExpectedRow>& expected) {
void ExpectMessageBatch(NActors::TActorId readActorId, const TBatch& expected) {
Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch()));

auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId);
UNIT_ASSERT_VALUES_EQUAL(expected.size(), eventHolder->Get()->Record.MessagesSize());
for (size_t i = 0; i < expected.size(); ++i) {
NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(i);

UNIT_ASSERT_VALUES_EQUAL_C(message.ColumnsPayloadSize(), expected[i].ExpectedCells.size(), "Message id: " << i);
for (size_t j = 0; const auto& expectedCell : expected[i].ExpectedCells) {
const auto& serializedColumn = eventHolder->Get()->GetPayload(message.GetColumnsPayload().Get(j++));
switch (expectedCell.Type) {
case TExpectedCell::EMessageType::String:
CheckStringColumn(serializedColumn, expectedCell.ExperctedString);
break;

case TExpectedCell::EMessageType::Int:
CheckIntColumn(serializedColumn, expectedCell.ExperctedInt);
break;
}
}
}
UNIT_ASSERT_VALUES_EQUAL(1, eventHolder->Get()->Record.MessagesSize());

NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(0);
UNIT_ASSERT_VALUES_EQUAL(message.OffsetsSize(), expected.Rows.size());
CheckMessageBatch(eventHolder->Get()->GetPayload(message.GetPayloadId()), expected);
}

void ExpectSessionError(NActors::TActorId readActorId, TStatus::TStatusCode statusCode, TString message = "") {
Expand Down Expand Up @@ -233,7 +162,13 @@ class TFixture : public NTests::TBaseFixture {
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvMessageBatch>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
UNIT_ASSERT(eventHolder.Get() != nullptr);
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId);
return eventHolder->Get()->Record.MessagesSize();

size_t numberMessages = 0;
for (const auto& message : eventHolder->Get()->Record.GetMessages()) {
numberMessages += message.OffsetsSize();
}

return numberMessages;
}

void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds) {
Expand All @@ -246,8 +181,8 @@ class TFixture : public NTests::TBaseFixture {
}
}

static TExpectedRow JsonMessage(ui64 index) {
return TExpectedRow().AddInt(100 * index).AddString(TStringBuilder() << "value" << index);
static TRow JsonMessage(ui64 index) {
return TRow().AddUint64(100 * index).AddString(TStringBuilder() << "value" << index);
}

public:
Expand Down Expand Up @@ -371,10 +306,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
StartSession(ReadActorId2, source, 2);

ExpectNewDataArrived({ReadActorId1, ReadActorId2});
std::vector<TExpectedRow> expected1 = { JsonMessage(2), JsonMessage(3) };
TBatch expected1 = { JsonMessage(2), JsonMessage(3) };
ExpectMessageBatch(ReadActorId1, expected1);

std::vector<TExpectedRow> expected2 = { JsonMessage(3) };
TBatch expected2 = { JsonMessage(3) };
ExpectMessageBatch(ReadActorId2, expected2);

const std::vector<TString> data2 = { Json4 };
Expand Down Expand Up @@ -569,7 +504,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
TString json1 = "{\"dt\":100,\"field1\":\"str\",\"value\":\"value1\"}";
PQWrite({ json1 }, topicName);
ExpectNewDataArrived({ReadActorId1});
ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str") });
ExpectMessageBatch(ReadActorId1, { JsonMessage(1).AddString("str", true) });

auto source2 = BuildSource(topicName);
source2.AddColumns("field1");
Expand Down

0 comments on commit 7ca3482

Please sign in to comment.