Skip to content

Commit

Permalink
test(kqp): prepare unit tests for enabling stream lookup join (ydb-pl…
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Jun 26, 2024
1 parent 2fc2f71 commit d5dd2aa
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 75 deletions.
1 change: 0 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(
const TDqJoin& join,
TExprBase leftInput,
const TPrefixLookup& rightLookup,

const TKqpMatchReadResult& rightReadMatch,
TExprContext& ctx)
{
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,10 @@ void TestKeyCastForAllJoinTypes(TSession session, const TString& leftTable, cons
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
return;
}
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
} else if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
Expand Down
96 changes: 62 additions & 34 deletions ydb/core/kqp/ut/join/kqp_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,25 +249,39 @@ Y_UNIT_TEST_SUITE(KqpJoin) {

auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);

for (const auto& tableStat : stats.query_phases(0).table_access()) {
if (tableStat.name() == "/Root/Join1_2") {
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 1);
} else {
UNIT_ASSERT_VALUES_EQUAL(tableStat.name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 8);
}
}
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);

ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
index = 2;
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 1);
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);

ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
index = 2;
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 1);
}

Y_UNIT_TEST(IdxLookupPartialLeftPredicate) {
Expand Down Expand Up @@ -299,25 +313,39 @@ Y_UNIT_TEST_SUITE(KqpJoin) {
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
Cerr << stats.DebugString() << Endl;

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);

for (const auto& tableStat : stats.query_phases(0).table_access()) {
if (tableStat.name() == "/Root/Join1_2") {
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 3);
} else {
UNIT_ASSERT_VALUES_EQUAL(tableStat.name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 8);
}
}
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);

ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
index = 2;
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 3);
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1_1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 8);

ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
index = 2;
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join1_2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 3);
}

Y_UNIT_TEST(IdxLookupPartialWithTempTable) {
Expand Down
72 changes: 50 additions & 22 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1465,24 +1465,37 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
])", FormatResultSetYson(result.GetResultSet(0)));

auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);

for (const auto& tableStat : stats.query_phases(0).table_access()) {
if (tableStat.name() == "/Root/Join2") {
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 7);
} else {
UNIT_ASSERT_VALUES_EQUAL(tableStat.name(), "/Root/Join1");
}
}
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
}
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Join1");

ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0);
index = 2;
}
ui32 index = 1;
if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 0);
index = 2;
}

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Join2");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), 4);
}
}

Y_UNIT_TEST(JoinIdxLookupWithPredicate) {
Expand Down Expand Up @@ -3481,9 +3494,9 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(StreamLookupForDataQuery) {
Y_UNIT_TEST_TWIN(StreamLookupForDataQuery, StreamLookupJoin) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(appConfig));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand Down Expand Up @@ -3520,12 +3533,27 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT(streamLookup.IsDefined());

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2);

if (StreamLookupJoin) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);

for (const auto& tableStat : stats.query_phases(0).table_access()) {
if (tableStat.name() == "/Root/EightShard") {
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 29);
} else {
UNIT_ASSERT_VALUES_EQUAL(tableStat.name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(tableStat.reads().rows(), 2);
}
}
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/EightShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).reads().rows(), 2);
}
}

{
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,9 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE t1.Key = $key;
)"), params);

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
} else if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
Expand All @@ -634,7 +636,9 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
WHERE t1.Key = $key;
)"), params);

if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
} else if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/query/kqp_explain_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
Y_UNIT_TEST(MultiJoinCteLinks) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr{settings};
Expand Down
42 changes: 27 additions & 15 deletions ydb/core/kqp/ut/query/kqp_stats_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ Y_UNIT_TEST(JoinNoStatsScan) {

template <typename Iterator>
TCollectedStreamResult JoinStatsBasic(
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter, bool StreamLookupJoin = false) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
Expand All @@ -120,15 +121,18 @@ TCollectedStreamResult JoinStatsBasic(
return res;
}

Y_UNIT_TEST(JoinStatsBasicYql) {
auto res = JoinStatsBasic<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator);
Y_UNIT_TEST_TWIN(JoinStatsBasicYql, StreamLookupJoin) {
auto res = JoinStatsBasic<NYdb::NScripting::TYqlResultPartIterator>(GetYqlStreamIterator, StreamLookupJoin);

UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 3);
if (res.QueryStats->query_phases(0).table_access(0).name() == "/Root/KeyValue") {
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/EightShard");
if (StreamLookupJoin) {
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access().size(), 2);
} else {
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(0).table_access(0).name(), "/Root/EightShard");
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(2).table_access(0).name(), "/Root/KeyValue");
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(res.QueryStats->query_phases(1).table_access(0).name(), "/Root/KeyValue");
}
}

Expand Down Expand Up @@ -401,9 +405,9 @@ Y_UNIT_TEST(StatsProfile) {
//UNIT_ASSERT_EQUAL(node2.GetMap().at("Stats").GetMapSafe().at("ComputeNodes").GetArraySafe().size(), 1);
}

Y_UNIT_TEST(StreamLookupStats) {
Y_UNIT_TEST_TWIN(StreamLookupStats, StreamLookupJoin) {
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(StreamLookupJoin);
TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(app));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
Expand All @@ -425,11 +429,19 @@ Y_UNIT_TEST(StreamLookupStats) {
UNIT_ASSERT(streamLookup.IsDefined());

auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TwoShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1);

if (StreamLookupJoin) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).partitions_count(), 1);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TwoShard");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1);
}

AssertTableStats(result, "/Root/TwoShard", {
.ExpectedReads = 2,
Expand Down

0 comments on commit d5dd2aa

Please sign in to comment.